| """Token prediction metric.""" |
|
|
| from typing import List, Tuple |
|
|
| import datasets |
| import numpy as np |
| from Levenshtein import distance as levenshtein_distance |
| from scipy.optimize import linear_sum_assignment |
|
|
| import evaluate |
|
|
|
|
| _DESCRIPTION = """ |
| Unofficial implementation of the Error Reduction Rate (ERR) metric introduced for lexical normalization. |
| This implementation works on Seq2Seq models by aligning the predictions with the ground truth outputs. |
| """ |
|
|
|
|
| _KWARGS_DESCRIPTION = """ |
| Args: |
| predictions (`list` of `str`): Predicted labels. |
| references (`list` of `Dict[str, str]`): Ground truth sentences, each with a field `input` and `output`. |
| Returns: |
| `err` (`float` or `int`): Error Reduction Rate. See here: http://noisy-text.github.io/2021/multi-lexnorm.html |
| `err_tp` (`int`): Number of true positives. |
| `err_fn` (`int`): Number of false negatives. |
| `err_tn` (`int`): Number of true negatives. |
| `err_fp` (`int`): Number of false positives. |
| Examples: |
| Example 1-A simple example |
| >>> err = evaluate.load("err") |
| >>> results = err.compute(predictions=[["The", "large", "dog"]], references=[{"input": ["The", "large", "dawg"], "output": ["The", "large", "dog"]}]) |
| >>> print(results) |
| {'err': 1.0, 'err_tp': 2, 'err_fn': 0, 'err_tn': 1, 'err_fp': 0} |
| """ |
|
|
|
|
| _CITATION = """ |
| @inproceedings{baldwin-etal-2015-shared, |
| title = "Shared Tasks of the 2015 Workshop on Noisy User-generated Text: {T}witter Lexical Normalization and Named Entity Recognition", |
| author = "Baldwin, Timothy and |
| de Marneffe, Marie Catherine and |
| Han, Bo and |
| Kim, Young-Bum and |
| Ritter, Alan and |
| Xu, Wei", |
| booktitle = "Proceedings of the Workshop on Noisy User-generated Text", |
| month = jul, |
| year = "2015", |
| address = "Beijing, China", |
| publisher = "Association for Computational Linguistics", |
| url = "https://aclanthology.org/W15-4319", |
| doi = "10.18653/v1/W15-4319", |
| pages = "126--135", |
| } |
| """ |
|
|
|
|
| @evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION) |
| class ErrorReductionRate(evaluate.Metric): |
| def _info(self): |
| return evaluate.MetricInfo( |
| description=_DESCRIPTION, |
| citation=_CITATION, |
| inputs_description=_KWARGS_DESCRIPTION, |
| features=datasets.Features( |
| { |
| "predictions": datasets.Sequence(datasets.Value("string")), |
| "references": { |
| "input": datasets.Sequence(datasets.Value("string")), |
| "output": datasets.Sequence(datasets.Value("string")), |
| }, |
| } |
| ), |
| ) |
|
|
| def _compute(self, predictions, references): |
|
|
| tp, fn, tn, fp = 0, 0, 0, 0 |
| for pred, ref in zip(predictions, references): |
| inputs, outputs = ref["input"], ref["output"] |
|
|
| labels = self._split_expressions_into_tokens(outputs) |
|
|
| assert len(pred) == len( |
| labels |
| ), f"Number of predicted words ({len(pred)}) does not match number of target words ({len(labels)})" |
|
|
| formatted_preds = self._align_predictions_with_labels(pred, labels) |
|
|
| for i in range(len(inputs)): |
| |
| if inputs[i].lower() != outputs[i]: |
| tp += formatted_preds[i] == outputs[i] |
| fn += formatted_preds[i] != outputs[i] |
| else: |
| tn += formatted_preds[i] == outputs[i] |
| fp += formatted_preds[i] != outputs[i] |
|
|
| err = (tp - fp) / (tp + fn) |
|
|
| return {"err": err, "err_tp": tp, "err_fn": fn, "err_tn": tn, "err_fp": fp} |
|
|
| def _align_predictions_with_labels(self, predictions: List[str], labels: List[Tuple[str, int]]) -> List[str]: |
| levenshtein_matrix = np.zeros((len(labels), len(predictions))) |
|
|
| for i, (label, _) in enumerate(labels): |
| for j, pred in enumerate(predictions): |
| levenshtein_matrix[i, j] = levenshtein_distance(label, pred) |
|
|
| col_alignment, row_alignment = linear_sum_assignment(levenshtein_matrix) |
| alignment = sorted(row_alignment, key=lambda i: col_alignment[i]) |
|
|
| num_outputs = max(map(lambda x: x[1], labels)) + 1 |
| formatted_preds = [[] for _ in range(num_outputs)] |
| for i, aligned_idx in enumerate(alignment): |
| formatted_preds[labels[i][1]].append(predictions[aligned_idx]) |
|
|
| formatted_preds = [" ".join(preds) for preds in formatted_preds] |
|
|
| return formatted_preds |
|
|
| def _split_expressions_into_tokens(self, outputs: List[str]) -> List[Tuple[str, int]]: |
| labels = [] |
| for segment, normalized in enumerate(outputs): |
| if normalized == "": |
| labels.append((normalized, segment)) |
| else: |
| for w in normalized.split(): |
| labels.append((w, segment)) |
|
|
| return labels |
|
|