finmmeval-task2-final-portal / evaluate_task2_submission.py
Zhuohan's picture
Deploy Task 2 final-test portal
4f16637 verified
#!/usr/bin/env python3
"""Evaluate Task 2 JSONL submissions against private gold answers."""
from __future__ import annotations
import argparse
import json
import re
import sys
import unicodedata
from collections import Counter, defaultdict
from pathlib import Path
from statistics import mean
from typing import Any
TOKEN_RE = re.compile(
r"[\u4e00-\u9fff]|[\u3040-\u30ff]|[\u0600-\u06ff]|"
r"[a-zà-öø-ÿ\u0370-\u03ff]+|\d+(?:[.,]\d+)?",
re.IGNORECASE,
)
MAX_ANSWER_WORDS = 100
def load_jsonl(path: Path, display_name: str | None = None) -> list[dict[str, Any]]:
label = display_name or str(path)
rows: list[dict[str, Any]] = []
with path.open("r", encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
stripped = line.strip()
if not stripped:
continue
try:
row = json.loads(stripped)
except json.JSONDecodeError as exc:
raise ValueError(f"{label}: line {line_number}: invalid JSON: {exc.msg}") from exc
if not isinstance(row, dict):
raise ValueError(f"{label}: line {line_number}: expected a JSON object")
rows.append(row)
return rows
def tokenize(text: str) -> list[str]:
normalized = unicodedata.normalize("NFKC", str(text or "")).lower()
return TOKEN_RE.findall(normalized)
def word_count(text: str) -> int:
return len(tokenize(text))
def rouge1(prediction: str, reference: str) -> dict[str, float]:
pred_tokens = tokenize(prediction)
ref_tokens = tokenize(reference)
if not pred_tokens or not ref_tokens:
score = 1.0 if pred_tokens == ref_tokens else 0.0
return {"precision": score, "recall": score, "f1": score}
pred_counts = Counter(pred_tokens)
ref_counts = Counter(ref_tokens)
overlap = sum((pred_counts & ref_counts).values())
precision = overlap / len(pred_tokens)
recall = overlap / len(ref_tokens)
f1 = 0.0 if precision + recall == 0 else 2 * precision * recall / (precision + recall)
return {"precision": precision, "recall": recall, "f1": f1}
def index_submission(rows: list[dict[str, Any]]) -> tuple[dict[str, str], dict[str, int]]:
answers: dict[str, str] = {}
counts: Counter[str] = Counter()
for row in rows:
task_id = str(row.get("task_id", "")).strip()
counts[task_id] += 1
if task_id and task_id not in answers:
answers[task_id] = str(row.get("answer", ""))
duplicates = {task_id: count for task_id, count in counts.items() if task_id and count > 1}
return answers, duplicates
def evaluate(gold_rows: list[dict[str, Any]], submission_rows: list[dict[str, Any]]) -> dict[str, Any]:
answers, duplicates = index_submission(submission_rows)
gold_by_id = {row["task_id"]: row for row in gold_rows}
submitted_ids = set(answers)
expected_ids = set(gold_by_id)
missing_ids = sorted(expected_ids - submitted_ids)
extra_ids = sorted(submitted_ids - expected_ids)
blank_answer_ids = sorted(task_id for task_id in expected_ids & submitted_ids if not answers[task_id].strip())
over_length_answer_ids = sorted(
task_id
for task_id in expected_ids & submitted_ids
if answers[task_id].strip() and word_count(answers[task_id]) > MAX_ANSWER_WORDS
)
scored_rows: list[dict[str, Any]] = []
tier_scores: dict[str, list[dict[str, float]]] = defaultdict(list)
tier_expected: dict[str, list[str]] = defaultdict(list)
for task_id, gold in gold_by_id.items():
scores = rouge1(answers.get(task_id, ""), gold["answer"])
tier = gold.get("tier", "unknown")
tier_expected[tier].append(task_id)
scored_rows.append({"task_id": task_id, "tier": tier, **scores})
tier_scores[tier].append(scores)
def aggregate(scores: list[dict[str, float]]) -> dict[str, float]:
if not scores:
return {"rouge1_precision": 0.0, "rouge1_recall": 0.0, "rouge1_f1": 0.0}
return {
"rouge1_precision": mean(score["precision"] for score in scores),
"rouge1_recall": mean(score["recall"] for score in scores),
"rouge1_f1": mean(score["f1"] for score in scores),
}
all_scores = [{"precision": row["precision"], "recall": row["recall"], "f1": row["f1"]} for row in scored_rows]
def completion_counts(task_ids: list[str]) -> dict[str, Any]:
task_id_set = set(task_ids)
present = task_id_set & submitted_ids
answered = {task_id for task_id in present if answers.get(task_id, "").strip()}
total = len(task_id_set)
return {
"total": total,
"submitted": len(present),
"answered": len(answered),
"coverage": len(present) / total if total else 0.0,
"answered_coverage": len(answered) / total if total else 0.0,
}
overall_counts = completion_counts(list(expected_ids))
by_tier_completion = {
tier: completion_counts(task_ids) for tier, task_ids in sorted(tier_expected.items())
}
return {
"num_gold": len(gold_rows),
"num_submitted_rows": len(submission_rows),
"num_submitted_unique_task_ids": len(submitted_ids),
"coverage": overall_counts["coverage"],
"answered_coverage": overall_counts["answered_coverage"],
"completed": (
not missing_ids
and not extra_ids
and not duplicates
and not blank_answer_ids
),
"missing_count": len(missing_ids),
"extra_count": len(extra_ids),
"duplicate_count": len(duplicates),
"blank_answer_count": len(blank_answer_ids),
"over_length_answer_count": len(over_length_answer_ids),
"max_answer_words": MAX_ANSWER_WORDS,
"completion": overall_counts,
"completion_by_tier": by_tier_completion,
"missing_task_ids": missing_ids,
"extra_task_ids": extra_ids,
"duplicate_task_ids": duplicates,
"blank_answer_task_ids": blank_answer_ids,
"over_length_answer_task_ids": over_length_answer_ids,
"overall": aggregate(all_scores),
"by_tier": {tier: aggregate(scores) for tier, scores in sorted(tier_scores.items())},
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--gold", default="task2_november_test/outputs/gold_private/task2_test_gold.jsonl")
parser.add_argument("--submission", required=True)
parser.add_argument("--output")
args = parser.parse_args()
try:
result = evaluate(load_jsonl(Path(args.gold)), load_jsonl(Path(args.submission)))
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 2
text = json.dumps(result, ensure_ascii=False, indent=2) + "\n"
if args.output:
Path(args.output).write_text(text, encoding="utf-8")
print(text, end="")
return 0
if __name__ == "__main__":
raise SystemExit(main())