File size: 2,187 Bytes
6d5047c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Base metric class and batch/aggregate helpers."""

from __future__ import annotations

from collections import defaultdict
from typing import Dict, List

import torch


class Metric:
    """Base class for metrics that accumulate results over multiple __call__ and expose
    aggregate()."""

    def __init__(self, **kwargs):
        self.clear()

    def __call__(self, *args, **kwargs):
        """Compute metric for current batch, append to saved_metrics, and return the batch
        result."""
        metrics = self._compute(*args, **kwargs)
        for key, val in metrics.items():
            self.saved_metrics[key].append(val.detach().cpu().float())
        return metrics

    def _compute(self, **kwargs):
        """Subclasses implement this to compute metric dict from batch inputs."""
        raise NotImplementedError()

    def clear(self):
        """Reset all accumulated metric values."""
        self.saved_metrics = defaultdict(list)

    def aggregate(self):
        """Return a dict of concatenated/stacked tensors over all accumulated batches."""
        output = {}
        for key, lst in self.saved_metrics.items():
            try:
                output[key] = torch.cat(lst)
            except RuntimeError:
                output[key] = torch.stack(lst)
        return output


def compute_metrics(metrics_list: List[Metric], metrics_in: Dict) -> Dict:
    """Run each metric on metrics_in and return the combined dict of batch results."""
    metrics_out = {}
    for metric in metrics_list:
        metrics_out.update(metric(**metrics_in))
    return metrics_out


def aggregate_metrics(metrics_list: List[Metric]) -> Dict:
    """Return combined aggregated results (concatenated over batches) for all metrics."""
    metrics_out = {}
    for metric in metrics_list:
        metrics_out.update(metric.aggregate())
    return metrics_out


def clear_metrics(metrics_list: List[Metric]) -> None:
    """Clear accumulated values for all metrics in the list."""
    for metric in metrics_list:
        metric.clear()