Shortcuts

Source code for ignite.metrics.psnr

from typing import Callable, Sequence, Union

import torch

from ignite.exceptions import NotComputableError
from ignite.metrics.metric import Metric, reinit__is_reduced, sync_all_reduce

__all__ = ["PSNR"]


[docs]class PSNR(Metric): r"""Computes average `Peak signal-to-noise ratio (PSNR) <https://en.wikipedia.org/wiki/Peak_signal-to-noise_ratio>`_. .. math:: \text{PSNR}(I, J) = 10 * \log_{10}\left(\frac{ MAX_{I}^2 }{ \text{ MSE } }\right) where :math:`\text{MSE}` is `mean squared error <https://en.wikipedia.org/wiki/Mean_squared_error>`_. - `y_pred` and `y` **must** have (batch_size, ...) shape. - `y_pred` and `y` **must** have same dtype and same shape. Args: data_range: The data range of the target image (distance between minimum and maximum possible values). For other data types, please set the data range, otherwise an exception will be raised. output_transform: A callable that is used to transform the Engine’s process_function’s output into the form expected by the metric. device: specifies which device updates are accumulated on. Setting the metric’s device to be the same as your update arguments ensures the update method is non-blocking. By default, CPU. Example: To use with ``Engine`` and ``process_function``, simply attach the metric instance to the engine. The output of the engine's ``process_function`` needs to be in format of ``(y_pred, y)`` or ``{'y_pred': y_pred, 'y': y, ...}``. .. code-block:: python def process_function(engine, batch): # ... return y_pred, y engine = Engine(process_function) psnr = PSNR(data_range=1.0) psnr.attach(engine, "psnr") # ... state = engine.run(data) print(f"PSNR: {state.metrics['psnr']}") This metric by default accepts Grayscale or RGB images. But if you have YCbCr or YUV images, only Y channel is needed for computing PSNR. And, this can be done with ``output_transform``. For instance, .. code-block:: python def get_y_channel(output): y_pred, y = output # y_pred and y are (B, 3, H, W) and YCbCr or YUV images # let's select y channel return y_pred[:, 0, ...], y[:, 0, ...] psnr = PSNR(data_range=219, output_transform=get_y_channel) psnr.attach(engine, "psnr") # ... state = engine.run(data) print(f"PSNR: {state.metrics['psrn']}") .. versionadded:: 0.4.3 """ def __init__( self, data_range: Union[int, float], output_transform: Callable = lambda x: x, device: Union[str, torch.device] = torch.device("cpu"), ): super().__init__(output_transform=output_transform, device=device) self.data_range = data_range def _check_shape_dtype(self, output: Sequence[torch.Tensor]) -> None: y_pred, y = output if y_pred.dtype != y.dtype: raise TypeError( f"Expected y_pred and y to have the same data type. Got y_pred: {y_pred.dtype} and y: {y.dtype}." ) if y_pred.shape != y.shape: raise ValueError( f"Expected y_pred and y to have the same shape. Got y_pred: {y_pred.shape} and y: {y.shape}." )
[docs] @reinit__is_reduced def reset(self) -> None: self._sum_of_batchwise_psnr = torch.tensor(0.0, dtype=torch.float64, device=self._device) self._num_examples = 0
[docs] @reinit__is_reduced def update(self, output: Sequence[torch.Tensor]) -> None: self._check_shape_dtype(output) y_pred, y = output[0].detach(), output[1].detach() dim = tuple(range(1, y.ndim)) mse_error = torch.pow(y_pred.double() - y.view_as(y_pred).double(), 2).mean(dim=dim) self._sum_of_batchwise_psnr += torch.sum(10.0 * torch.log10(self.data_range ** 2 / (mse_error + 1e-10))).to( device=self._device ) self._num_examples += y.shape[0]
[docs] @sync_all_reduce("_sum_of_batchwise_psnr", "_num_examples") def compute(self) -> torch.Tensor: if self._num_examples == 0: raise NotComputableError("PSNR must have at least one example before it can be computed.") return self._sum_of_batchwise_psnr / self._num_examples