| |
| import logging |
| from abc import ABCMeta, abstractmethod |
| from typing import Any, List, Optional, Sequence, Union |
|
|
| from torch import Tensor |
|
|
| from mmengine.dist import (broadcast_object_list, collect_results, |
| is_main_process) |
| from mmengine.fileio import dump |
| from mmengine.logging import print_log |
| from mmengine.registry import METRICS |
| from mmengine.structures import BaseDataElement |
|
|
|
|
| class BaseMetric(metaclass=ABCMeta): |
| """Base class for a metric. |
| |
| The metric first processes each batch of data_samples and predictions, |
| and appends the processed results to the results list. Then it |
| collects all results together from all ranks if distributed training |
| is used. Finally, it computes the metrics of the entire dataset. |
| |
| A subclass of class:`BaseMetric` should assign a meaningful value to the |
| class attribute `default_prefix`. See the argument `prefix` for details. |
| |
| Args: |
| collect_device (str): Device name used for collecting results from |
| different ranks during distributed training. Must be 'cpu' or |
| 'gpu'. Defaults to 'cpu'. |
| prefix (str, optional): The prefix that will be added in the metric |
| names to disambiguate homonymous metrics of different evaluators. |
| If prefix is not provided in the argument, self.default_prefix |
| will be used instead. Default: None |
| collect_dir: (str, optional): Synchronize directory for collecting data |
| from different ranks. This argument should only be configured when |
| ``collect_device`` is 'cpu'. Defaults to None. |
| `New in version 0.7.3.` |
| """ |
|
|
| default_prefix: Optional[str] = None |
|
|
| def __init__(self, |
| collect_device: str = 'cpu', |
| prefix: Optional[str] = None, |
| collect_dir: Optional[str] = None) -> None: |
| if collect_dir is not None and collect_device != 'cpu': |
| raise ValueError('`collec_dir` could only be configured when ' |
| "`collect_device='cpu'`") |
|
|
| self._dataset_meta: Union[None, dict] = None |
| self.collect_device = collect_device |
| self.results: List[Any] = [] |
| self.prefix = prefix or self.default_prefix |
| self.collect_dir = collect_dir |
|
|
| if self.prefix is None: |
| print_log( |
| 'The prefix is not set in metric class ' |
| f'{self.__class__.__name__}.', |
| logger='current', |
| level=logging.WARNING) |
|
|
| @property |
| def dataset_meta(self) -> Optional[dict]: |
| """Optional[dict]: Meta info of the dataset.""" |
| return self._dataset_meta |
|
|
| @dataset_meta.setter |
| def dataset_meta(self, dataset_meta: dict) -> None: |
| """Set the dataset meta info to the metric.""" |
| self._dataset_meta = dataset_meta |
|
|
| @abstractmethod |
| def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None: |
| """Process one batch of data samples and predictions. The processed |
| results should be stored in ``self.results``, which will be used to |
| compute the metrics when all batches have been processed. |
| |
| Args: |
| data_batch (Any): A batch of data from the dataloader. |
| data_samples (Sequence[dict]): A batch of outputs from |
| the model. |
| """ |
|
|
| @abstractmethod |
| def compute_metrics(self, results: list) -> dict: |
| """Compute the metrics from processed results. |
| |
| Args: |
| results (list): The processed results of each batch. |
| |
| Returns: |
| dict: The computed metrics. The keys are the names of the metrics, |
| and the values are corresponding results. |
| """ |
|
|
| def evaluate(self, size: int) -> dict: |
| """Evaluate the model performance of the whole dataset after processing |
| all batches. |
| |
| Args: |
| size (int): Length of the entire validation dataset. When batch |
| size > 1, the dataloader may pad some data samples to make |
| sure all ranks have the same length of dataset slice. The |
| ``collect_results`` function will drop the padded data based on |
| this size. |
| |
| Returns: |
| dict: Evaluation metrics dict on the val dataset. The keys are the |
| names of the metrics, and the values are corresponding results. |
| """ |
| if len(self.results) == 0: |
| print_log( |
| f'{self.__class__.__name__} got empty `self.results`. Please ' |
| 'ensure that the processed results are properly added into ' |
| '`self.results` in `process` method.', |
| logger='current', |
| level=logging.WARNING) |
|
|
| if self.collect_device == 'cpu': |
| results = collect_results( |
| self.results, |
| size, |
| self.collect_device, |
| tmpdir=self.collect_dir) |
| else: |
| results = collect_results(self.results, size, self.collect_device) |
|
|
| if is_main_process(): |
| |
| results = _to_cpu(results) |
| _metrics = self.compute_metrics(results) |
| |
| if self.prefix: |
| _metrics = { |
| '/'.join((self.prefix, k)): v |
| for k, v in _metrics.items() |
| } |
| metrics = [_metrics] |
| else: |
| metrics = [None] |
|
|
| broadcast_object_list(metrics) |
|
|
| |
| self.results.clear() |
| return metrics[0] |
|
|
|
|
| @METRICS.register_module() |
| class DumpResults(BaseMetric): |
| """Dump model predictions to a pickle file for offline evaluation. |
| |
| Args: |
| out_file_path (str): Path of the dumped file. Must end with '.pkl' |
| or '.pickle'. |
| collect_device (str): Device name used for collecting results from |
| different ranks during distributed training. Must be 'cpu' or |
| 'gpu'. Defaults to 'cpu'. |
| collect_dir: (str, optional): Synchronize directory for collecting data |
| from different ranks. This argument should only be configured when |
| ``collect_device`` is 'cpu'. Defaults to None. |
| `New in version 0.7.3.` |
| """ |
|
|
| def __init__(self, |
| out_file_path: str, |
| collect_device: str = 'cpu', |
| collect_dir: Optional[str] = None) -> None: |
| super().__init__( |
| collect_device=collect_device, collect_dir=collect_dir) |
| if not out_file_path.endswith(('.pkl', '.pickle')): |
| raise ValueError('The output file must be a pkl file.') |
| self.out_file_path = out_file_path |
|
|
| def process(self, data_batch: Any, predictions: Sequence[dict]) -> None: |
| """transfer tensors in predictions to CPU.""" |
| self.results.extend(_to_cpu(predictions)) |
|
|
| def compute_metrics(self, results: list) -> dict: |
| """dump the prediction results to a pickle file.""" |
| dump(results, self.out_file_path) |
| print_log( |
| f'Results has been saved to {self.out_file_path}.', |
| logger='current') |
| return {} |
|
|
|
|
| def _to_cpu(data: Any) -> Any: |
| """transfer all tensors and BaseDataElement to cpu.""" |
| if isinstance(data, (Tensor, BaseDataElement)): |
| return data.to('cpu') |
| elif isinstance(data, list): |
| return [_to_cpu(d) for d in data] |
| elif isinstance(data, tuple): |
| return tuple(_to_cpu(d) for d in data) |
| elif isinstance(data, dict): |
| return {k: _to_cpu(v) for k, v in data.items()} |
| else: |
| return data |
|
|