Shortcuts

Source code for ignite.metrics.loss

from typing import Callable, Dict, Sequence, Tuple, Union, cast

import torch

from ignite.exceptions import NotComputableError
from ignite.metrics.metric import Metric, reinit__is_reduced, sync_all_reduce

__all__ = ["Loss"]


[docs]class Loss(Metric): """ Calculates the average loss according to the passed loss_fn. Args: loss_fn: a callable taking a prediction tensor, a target tensor, optionally other arguments, and returns the average loss over all observations in the batch. output_transform: a callable that is used to transform the :class:`~ignite.engine.engine.Engine`'s ``process_function``'s output into the form expected by the metric. This can be useful if, for example, you have a multi-output model and you want to compute the metric with respect to one of the outputs. The output is expected to be a tuple `(prediction, target)` or (prediction, target, kwargs) where kwargs is a dictionary of extra keywords arguments. If extra keywords arguments are provided they are passed to `loss_fn`. batch_size: a callable taking a target tensor that returns the first dimension size (usually the batch size). device: specifies which device updates are accumulated on. Setting the metric's device to be the same as your ``update`` arguments ensures the ``update`` method is non-blocking. By default, CPU. """ required_output_keys = None def __init__( self, loss_fn: Callable, output_transform: Callable = lambda x: x, batch_size: Callable = lambda x: len(x), device: Union[str, torch.device] = torch.device("cpu"), ): super(Loss, self).__init__(output_transform, device=device) self._loss_fn = loss_fn self._batch_size = batch_size
[docs] @reinit__is_reduced def reset(self) -> None: self._sum = torch.tensor(0.0, device=self._device) self._num_examples = 0
[docs] @reinit__is_reduced def update(self, output: Sequence[Union[torch.Tensor, Dict]]) -> None: if len(output) == 2: y_pred, y = cast(Tuple[torch.Tensor, torch.Tensor], output) kwargs = {} # type: Dict else: y_pred, y, kwargs = cast(Tuple[torch.Tensor, torch.Tensor, Dict], output) average_loss = self._loss_fn(y_pred, y, **kwargs).detach() if len(average_loss.shape) != 0: raise ValueError("loss_fn did not return the average loss.") n = self._batch_size(y) self._sum += average_loss.to(self._device) * n self._num_examples += n
[docs] @sync_all_reduce("_sum", "_num_examples") def compute(self) -> float: if self._num_examples == 0: raise NotComputableError("Loss must have at least one example before it can be computed.") return self._sum.item() / self._num_examples