Source code for ai4water.postprocessing.SeqMetrics._main

import json
import warnings
import numpy as np
from typing import Union

from ai4water.utils.utils import ts_features

from .utils import msg


# TODO remove repeated calculation of mse, std, mean etc
# TODO make weights, class attribute
# TODO write tests
# TODO standardized residual sum of squares
# http://documentation.sas.com/?cdcId=fscdc&cdcVersion=15.1&docsetId=fsug&docsetTarget=n1sm8nk3229ttun187529xtkbtpu.htm&locale=en
# https://arxiv.org/ftp/arxiv/papers/1809/1809.03006.pdf
# https://www.researchgate.net/profile/Mark-Tschopp/publication/322147437_Quantifying_Similarity_and_Distance_Measures_for_Vector-Based_Datasets_Histograms_Signals_and_Probability_Distribution_Functions/links/5a48089ca6fdcce1971c8142/Quantifying-Similarity-and-Distance-Measures-for-Vector-Based-Datasets-Histograms-Signals-and-Probability-Distribution-Functions.pdf
# maximum absolute error
# relative mean absolute error
# relative rmse
# Log mse
# Jeffreys Divergence
# kullback-Leibler divergence
# Peak flow ratio https://hess.copernicus.org/articles/24/869/2020/
# Legates׳s coefficient of efficiency
# outliear percentage : pysteps
# mean squared error skill score, mean absolute error skill score, https://doi.org/10.1016/j.ijforecast.2018.11.010
# root mean quartic error, Kolmogorov–Smirnov test integral, OVERPer, Rényi entropy,
# 95th percentile: https://doi.org/10.1016/j.solener.2014.10.016
# Friedman test: https://doi.org/10.1016/j.solener.2014.10.016

EPS = 1e-10  # epsilon

# TODO classification metrics
# cross entropy

# TODO probability losses
# log normal loss
# skill score

# TODO multi horizon metrics


[docs]class Metrics(object): warnings.warn(msg("Metrics"), UserWarning) """ This class does some pre-processign and handles metadata regaring true and predicted arrays. The arguments other than `true` and `predicted` are dynamic i.e. they can be changed from outside the class. This means the user can change their value after creating the class. This will be useful if we want to calculate an error once by ignoring NaN and then by not ignoring the NaNs. However, the user has to run the method `treat_arrays` in order to have the changed values impact on true and predicted arrays. Literature: https://www-miklip.dkrz.de/about/murcss/ """
[docs] def __init__(self, true: Union[np.ndarray, list], predicted: Union[np.ndarray, list], replace_nan: Union[int, float, None] = None, replace_inf: Union[int, float, None] = None, remove_zero: bool = False, remove_neg: bool = False, metric_type: str = 'regression' ): """ Arguments: true : array like, ture/observed/actual/target values predicted : array like, simulated values replace_nan : default None. if not None, then NaNs in true and predicted will be replaced by this value. replace_inf : default None, if not None, then inf vlaues in true and predicted will be replaced by this value. remove_zero : default False, if True, the zero values in true or predicted arrays will be removed. If a zero is found in one array, the corresponding value in the other array will also be removed. remove_neg : default False, if True, the negative values in true or predicted arrays will be removed. metric_type : type of metric. """ self.metric_type = metric_type self.true, self.predicted = self._pre_process(true, predicted) self.replace_nan = replace_nan self.replace_inf = replace_inf self.remove_zero = remove_zero self.remove_neg = remove_neg
@staticmethod def _minimal() -> list: raise NotImplementedError @staticmethod def _scale_independent_metrics() -> list: raise NotImplementedError @staticmethod def _scale_dependent_metrics() -> list: raise NotImplementedError @property def replace_nan(self): return self._replace_nan @replace_nan.setter def replace_nan(self, x): self._replace_nan = x @property def replace_inf(self): return self._replace_inf @replace_inf.setter def replace_inf(self, x): self._replace_inf = x @property def remove_zero(self): return self._remove_zero @remove_zero.setter def remove_zero(self, x): self._remove_zero = x @property def remove_neg(self): return self._remove_neg @remove_neg.setter def remove_neg(self, x): self._remove_neg = x @property def assert_greater_than_one(self): # assert that both true and predicted arrays are greater than one. if len(self.true) <= 1 or len(self.predicted) <= 1: raise ValueError(f"""Expect length of true and predicted arrays to be larger than 1 but they are {len(self.true)} and {len(self.predicted)}""") return def _pre_process(self, true, predicted): predicted = self._assert_1darray(predicted) true = self._assert_1darray(true) assert len(predicted) == len(true), "lengths of provided arrays mismatch, predicted array: {}, true array: {}" \ .format(len(predicted), len(true)) return true, predicted def _assert_1darray(self, array_like) -> np.ndarray: """Makes sure that the provided `array_like` is 1D numpy array""" if not isinstance(array_like, np.ndarray): if not isinstance(array_like, list): # it can be pandas series or datafrmae if array_like.__class__.__name__ in ['Series', 'DataFrame']: if len(array_like.shape) > 1: # 1d series has shape (x,) while 1d dataframe has shape (x,1) if array_like.shape[1] > 1: # it is a 2d datafrmae raise TypeError("only 1d pandas Series or dataframe are allowed") np_array = np.array(array_like).reshape(-1, ) else: raise TypeError(f"all inputs must be numpy array or list but one is of type {type(array_like)}") else: np_array = np.array(array_like).reshape(-1, ) else: if np.ndim(array_like) > 1: sec_dim = array_like.shape[1] if self.metric_type != 'classification' and sec_dim > 1: raise ValueError(f"Array must not be 2d but it has shape {array_like.shape}") np_array = np.array(array_like).reshape(-1, ) if self.metric_type != 'classification' else array_like else: # maybe the dimension is >1 so make sure it is more np_array = array_like.reshape(-1, ) if self.metric_type != 'classification' else array_like if self.metric_type != 'classification': assert len(np_array.shape) == 1 return np_array
[docs] def calculate_all(self, statistics=False, verbose=False, write=False, name=None) -> dict: """ calculates errors using all available methods except brier_score.. write: bool, if True, will write the calculated errors in file. name: str, if not None, then must be path of the file in which to write.""" errors = {} for m in self.all_methods: if m not in ["brier_score"]: try: error = float(getattr(self, m)()) # some errors might not have been computed and returned a non float-convertible value e.g. None except TypeError: error = getattr(self, m)() errors[m] = error if verbose: if error is None: print('{0:25} : {1}'.format(m, error)) else: print('{0:25} : {1:<12.3f}'.format(m, error)) if statistics: errors['stats'] = self.stats(verbose=verbose) if write: if name is not None: assert isinstance(name, str) fname = name else: fname = 'errors' with open(fname + ".json", 'w') as fp: json.dump(errors, fp, sort_keys=True, indent=4) return errors
[docs] def calculate_minimal(self) -> dict: """ Calculates some basic metrics. Returns ------- dict Dictionary with all metrics """ metrics = {} for metric in self._minimal(): metrics[metric] = getattr(self, metric)() return metrics
def _error(self, true=None, predicted=None): """ simple difference """ if true is None: true = self.true if predicted is None: predicted = self.predicted return true - predicted def _percentage_error(self): """ Percentage error """ return self._error() / (self.true + EPS) * 100 def _naive_prognose(self, seasonality: int = 1): """ Naive forecasting method which just repeats previous samples """ return self.true[:-seasonality] def _relative_error(self, benchmark: np.ndarray = None): """ Relative Error """ if benchmark is None or isinstance(benchmark, int): # If no benchmark prediction provided - use naive forecasting if not isinstance(benchmark, int): seasonality = 1 else: seasonality = benchmark return self._error(self.true[seasonality:], self.predicted[seasonality:]) / \ (self._error(self.true[seasonality:], self._naive_prognose(seasonality)) + EPS) return self._error() / (self._error(self.true, benchmark) + EPS) def _bounded_relative_error(self, benchmark: np.ndarray = None): """ Bounded Relative Error """ if benchmark is None or isinstance(benchmark, int): # If no benchmark prediction provided - use naive forecasting if not isinstance(benchmark, int): seasonality = 1 else: seasonality = benchmark abs_err = np.abs(self._error(self.true[seasonality:], self.predicted[seasonality:])) abs_err_bench = np.abs(self._error(self.true[seasonality:], self._naive_prognose(seasonality))) else: abs_err = np.abs(self._error()) abs_err_bench = np.abs(self._error()) return abs_err / (abs_err + abs_err_bench + EPS) def _ae(self): """Absolute error """ return np.abs(self.true - self.predicted)
[docs] def calculate_scale_independent_metrics(self) -> dict: """ Calculates scale independent metrics Returns ------- dict Dictionary with all metrics """ metrics = {} for metric in self._scale_independent_metrics(): metrics[metric] = getattr(self, metric)() return metrics
[docs] def calculate_scale_dependent_metrics(self) -> dict: """ Calculates scale dependent metrics Returns ------- dict Dictionary with all metrics """ metrics = {} for metric in self._scale_dependent_metrics(): metrics[metric] = getattr(self, metric)() return metrics
[docs] def scale_dependent_metrics(self): pass
[docs] def stats(self, verbose: bool = False) -> dict: """ returs some important stats about true and predicted values.""" _stats = dict() _stats['true'] = ts_features(self.true) _stats['predicted'] = ts_features(self.predicted) if verbose: print("\nName True Predicted ") print("----------------------------------------") for k in _stats['true'].keys(): print("{:<25}, {:<10}, {:<10}" .format(k, round(_stats['true'][k], 4), round(_stats['predicted'][k]))) return _stats
[docs] def percentage_metrics(self): pass
[docs] def relative_metrics(self): pass
[docs] def composite_metrics(self): pass
[docs] def treat_values(self): """ This function is applied by default at the start/at the time of initiating the class. However, it can used any time after that. This can be handy if we want to calculate error first by ignoring nan and then by no ignoring nan. Adopting from https://github.com/BYU-Hydroinformatics/HydroErr/blob/master/HydroErr/HydroErr.py#L6210 Removes the nan, negative, and inf values in two numpy arrays """ sim_copy = np.copy(self.predicted) obs_copy = np.copy(self.true) # Treat missing data in observed_array and simulated_array, rows in simulated_array or # observed_array that contain nan values all_treatment_array = np.ones(obs_copy.size, dtype=bool) if np.any(np.isnan(obs_copy)) or np.any(np.isnan(sim_copy)): if self.replace_nan is not None: # Finding the NaNs sim_nan = np.isnan(sim_copy) obs_nan = np.isnan(obs_copy) # Replacing the NaNs with the input sim_copy[sim_nan] = self.replace_nan obs_copy[obs_nan] = self.replace_nan warnings.warn("Elements(s) {} contained NaN values in the simulated array and " "elements(s) {} contained NaN values in the observed array and have been " "replaced (Elements are zero indexed).".format(np.where(sim_nan)[0], np.where(obs_nan)[0]), UserWarning) else: # Getting the indices of the nan values, combining them, and informing user. nan_indices_fcst = ~np.isnan(sim_copy) nan_indices_obs = ~np.isnan(obs_copy) all_nan_indices = np.logical_and(nan_indices_fcst, nan_indices_obs) all_treatment_array = np.logical_and(all_treatment_array, all_nan_indices) warnings.warn("Row(s) {} contained NaN values and the row(s) have been " "removed (Rows are zero indexed).".format(np.where(~all_nan_indices)[0]), UserWarning) if np.any(np.isinf(obs_copy)) or np.any(np.isinf(sim_copy)): if self.replace_nan is not None: # Finding the NaNs sim_inf = np.isinf(sim_copy) obs_inf = np.isinf(obs_copy) # Replacing the NaNs with the input sim_copy[sim_inf] = self.replace_inf obs_copy[obs_inf] = self.replace_inf warnings.warn("Elements(s) {} contained Inf values in the simulated array and " "elements(s) {} contained Inf values in the observed array and have been " "replaced (Elements are zero indexed).".format(np.where(sim_inf)[0], np.where(obs_inf)[0]), UserWarning) else: inf_indices_fcst = ~(np.isinf(sim_copy)) inf_indices_obs = ~np.isinf(obs_copy) all_inf_indices = np.logical_and(inf_indices_fcst, inf_indices_obs) all_treatment_array = np.logical_and(all_treatment_array, all_inf_indices) warnings.warn( "Row(s) {} contained Inf or -Inf values and the row(s) have been removed (Rows " "are zero indexed).".format(np.where(~all_inf_indices)[0]), UserWarning ) # Treat zero data in observed_array and simulated_array, rows in simulated_array or # observed_array that contain zero values if self.remove_zero: if (obs_copy == 0).any() or (sim_copy == 0).any(): zero_indices_fcst = ~(sim_copy == 0) zero_indices_obs = ~(obs_copy == 0) all_zero_indices = np.logical_and(zero_indices_fcst, zero_indices_obs) all_treatment_array = np.logical_and(all_treatment_array, all_zero_indices) warnings.warn( "Row(s) {} contained zero values and the row(s) have been removed (Rows are " "zero indexed).".format(np.where(~all_zero_indices)[0]), UserWarning ) # Treat negative data in observed_array and simulated_array, rows in simulated_array or # observed_array that contain negative values # Ignore runtime warnings from comparing if self.remove_neg: with np.errstate(invalid='ignore'): obs_copy_bool = obs_copy < 0 sim_copy_bool = sim_copy < 0 if obs_copy_bool.any() or sim_copy_bool.any(): neg_indices_fcst = ~sim_copy_bool neg_indices_obs = ~obs_copy_bool all_neg_indices = np.logical_and(neg_indices_fcst, neg_indices_obs) all_treatment_array = np.logical_and(all_treatment_array, all_neg_indices) warnings.warn("Row(s) {} contained negative values and the row(s) have been " "removed (Rows are zero indexed).".format(np.where(~all_neg_indices)[0]), UserWarning) self.true = obs_copy[all_treatment_array] self.predicted = sim_copy[all_treatment_array] return
[docs] def mse(self, weights=None) -> float: """ mean square error """ return float(np.average((self.true - self.predicted) ** 2, axis=0, weights=weights))