Source code for ai4water.utils.utils


import copy
import json
import pprint
import datetime
import warnings
from typing import Union, Any
from shutil import rmtree
from types import FunctionType
from typing import Tuple, List
import collections.abc as collections_abc

import scipy
from ai4water.backend import np, pd, plt, os
from easy_mpl import imshow
from scipy.stats import skew, kurtosis, variation, gmean, hmean

try:
    import wrapt
except ModuleNotFoundError:
    wrapt = None


MATRIC_TYPES = {
    "r2": "max",
    "nse": "max",
    "r2_score": "max",
    "kge": "max",
    "corr_coeff": "max",
    'accuracy': "max",
    'f1_score': 'max',
    "mse": "min",
    "rmse": "min",
    "mape": "min",
    "nrmse": "min",
}

ERROR_LABELS = {
    'r2': "$R^{2}$",
    'nse': 'NSE',
    'rmse': 'RMSE',
    'mse': 'MSE',
    'msle': 'MSLE',
    'nrmse': 'Normalized RMSE',
    'mape': 'MAPE',
    'r2_score': "$R^{2}$ Score",
    'mae': 'MAE',
    'mase': 'MASE'
}

def reset_seed(seed: Union[int, None], os=None, random=None, np=None,
               tf=None, torch=None):
    """
    Sets the random seed for a given module if the module is not None
    Arguments:
        seed : Value of seed to set. If None, then it means we don't wan't to set
        the seed.
        os : alias for `os` module of python
        random : alias for `random` module of python
        np : alias for `numpy` module
        tf : alias for `tensorflow` module.
        torch : alias for `pytorch` module.
        """
    if seed:
        if np:
            np.random.seed(seed)

        if random:
            random.seed(seed)

        if os:
            os.environ['PYTHONHASHSEED'] = str(seed)

        if tf:
            if int(tf.__version__.split('.')[0]) == 1:
                tf.compat.v1.random.set_random_seed(seed)
            elif int(tf.__version__.split('.')[0]) > 1:
                tf.random.set_seed(seed)

        if torch:
            torch.manual_seed(seed)
            torch.cuda.manual_seed(seed)
            torch.backends.cudnn.deterministic = True
            torch.backends.cudnn.benchmark = True
    return


def maybe_create_path(prefix=None, path=None):
    if path is None:
        save_dir = dateandtime_now()
        model_dir = os.path.join(os.getcwd(), "results")

        if prefix:
            model_dir = os.path.join(model_dir, prefix)

        save_dir = os.path.join(model_dir, save_dir)
    else:
        save_dir = path

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    for _dir in ['weights']:
        if not os.path.exists(os.path.join(save_dir, _dir)):
            os.makedirs(os.path.join(save_dir, _dir))

    return save_dir


def dateandtime_now() -> str:
    """
    Returns the datetime in following format as string
    YYYYMMDD_HHMMSS
    """
    jetzt = datetime.datetime.now()
    dt = ''
    for time in ['year', 'month', 'day', 'hour', 'minute', 'second']:
        _time = str(getattr(jetzt, time))
        if len(_time) < 2:
            _time = '0' + _time
        if time == 'hour':
            _time = '_' + _time
        dt += _time
    return dt


def dict_to_file(
        path,
        config=None, errors=None,
        indices=None, others=None, name=''):

    sort_keys = True
    if errors is not None:
        suffix = dateandtime_now()
        fpath = path + "/errors_" + name + suffix + ".json"
        # maybe some errors are not json serializable.
        for er_name, er_val in errors.items():
            if "int" in er_val.__class__.__name__:
                errors[er_name] = int(er_val)
            elif "float" in er_val.__class__.__name__:
                errors[er_name] = float(er_val)

        data = errors
    elif config is not None:
        fpath = path + "/config.json"
        data = config
        sort_keys = False
    elif indices is not None:
        fpath = path + "/indices.json"
        data = indices
    else:
        assert others is not None
        data = others
        fpath = path

    if 'config' in data:
        if data['config'].get('model', None) is not None:
            model = data['config']['model']
            # because ML args which come algorithms may not be of json serializable.
            if 'layers' not in model:

                model = jsonize(model)
                data['config']['model'] = model

    with open(fpath, 'w') as fp:
        json.dump(data, fp, sort_keys=sort_keys, indent=4, cls=JsonEncoder)

    return


def check_min_loss(epoch_losses, epoch, msg: str, save_fg: bool, to_save=None):
    epoch_loss_array = epoch_losses[:-1]

    current_epoch_loss = epoch_losses[-1]

    if len(epoch_loss_array) > 0:
        min_loss = np.min(epoch_loss_array)
    else:
        min_loss = current_epoch_loss

    if np.less(current_epoch_loss, min_loss):
        msg = msg + "    {:10.5f} ".format(current_epoch_loss)

        if to_save is not None:
            save_fg = True
    else:
        msg = msg + "              "

    return msg, save_fg


def check_kwargs(**kwargs):

    mode = "ML"
    if kwargs.get('model', None) is not None:
        model = kwargs['model']
        if isinstance(model, dict):
            if 'layers' in model:
                is_custom_model=False
                model_name = None
                mode="DL"
            else:
                assert len(model)==1
                _model = list(model.keys())[0]
                if isinstance(_model, str):
                    model_name = _model
                    is_custom_model = False
                elif hasattr(_model, '__call__'):   # uninitiated class
                    check_attributes(_model, ['fit', 'predict', '__init__'])
                    model_name = _model.__name__
                    is_custom_model = True
                else:  # custom class is already initiated
                    check_attributes(_model, ['fit', 'predict'])
                    is_custom_model = True
                    model_name = _model.__class__.__name__

        # for case when model='randomforestregressor'
        elif isinstance(model, str):
            kwargs['model'] = {model: {}}
            is_custom_model = False
            model_name = model

        elif hasattr(model, '__call__'):  # uninitiated class
            check_attributes(model, ['fit', 'predict', '__init__'])
            model_name = model.__name__
            is_custom_model = True
            kwargs['model'] = {model: {}}

        else:
            check_attributes(model, ['fit', 'predict'])
            is_custom_model = True
            model_name = model.__class__.__name__
            kwargs['model'] = {model: {}}

        if mode=="ML":
            # for ML, default batches will be 2d unless the user specifies
            # otherwise.
            if "batches" not in kwargs:
                kwargs["batches"] = "2d"

            if "ts_args" not in kwargs:
                kwargs["ts_args"] = {'lookback': 1,
                                     'forecast_len': 1,
                                     'forecast_step': 0,
                                     'known_future_inputs': False,
                                     'input_steps': 1,
                                     'output_steps': 1}
    else:
        is_custom_model = False
        model_name = None

    if is_custom_model:
        if 'mode' not in kwargs:
            raise ValueError("""your must provide 'mode' keyword either as 
            mode='regression' or mode='classification' for custom models""")

    return kwargs, model_name, is_custom_model


class make_model(object):

    def __init__(self, **kwargs):

        self.config, self.data_config, self.opt_paras, self.orig_model = _make_model(
            **kwargs)


def process_io(**kwargs):

    input_features = kwargs.get('input_features', None)
    output_features = kwargs.get('output_features', None)

    if isinstance(input_features, str):
        input_features = [input_features]
    if isinstance(output_features, str):
        output_features = [output_features]

    kwargs['input_features'] = input_features
    kwargs['output_features'] = output_features
    return kwargs


def _make_model(**kwargs):
    """
    This functions fills the default arguments needed to run all the models.
    All the input arguments can be overwritten
    by providing their name.
    :return
      nn_config: `dict`, contais parameters to build and train the neural network
        such as `layers`
      data_config: `dict`, contains parameters for data preparation/pre-processing/post-processing etc.
    """

    kwargs = process_io(**kwargs)

    kwargs, model_name, is_custom_model = check_kwargs(**kwargs)

    model = kwargs.get('model', None)
    def_cat = None

    if model is not None:
        if 'layers' in model:
            def_cat = "DL"
            # for DL, the default mode case will be regression
        else:
            def_cat = "ML"

    accept_additional_args = False
    if 'accept_additional_args' in kwargs:
        accept_additional_args = kwargs.pop('accept_additional_args')

    model_args = {

        'model': {'type': dict, 'default': None, 'lower': None, 'upper': None, 'between': None},
        # can be None or any of the method defined in ai4water.utils.transformatinos.py
        'x_transformation': {"type": [str, type(None), dict, list], "default": None, 'lower': None,
                             'upper': None, 'between': None},
        'y_transformation': {"type": [str, type(None), dict, list], "default": None, 'lower': None,
                             'upper': None, 'between': None},
        # for auto-encoders
        'composite':    {'type': bool, 'default': False, 'lower': None, 'upper': None, 'between': None},
        'lr':           {'type': float, 'default': 0.001, 'lower': None, 'upper': None, 'between': None},
        # can be any of valid keras optimizers https://www.tensorflow.org/api_docs/python/tf/keras/optimizers
        'optimizer':    {'type': str, 'default': 'adam', 'lower': None, 'upper': None, 'between': None},
        'loss':         {'type': [str, 'callable'], 'default': 'mse', 'lower': None, 'upper': None, 'between': None},
        'quantiles':    {'type': list, 'default': None, 'lower': None, 'upper': None, 'between': None},
        'epochs':       {'type': int, 'default': 14, 'lower': None, 'upper': None, 'between': None},
        'min_val_loss': {'type': float, 'default': 0.0001, 'lower': None, 'upper': None, 'between': None},
        'patience':     {'type': int, 'default': 100, 'lower': None, 'upper': None, 'between': None},
        'shuffle':      {'type': bool, 'default': True, 'lower': None, 'upper': None, 'between': None},
        # to save the best models using checkpoints
        'save_model':   {'type': bool, 'default': True, 'lower': None, 'upper': None, 'between': None},
        'backend':       {'type': None, 'default': 'tensorflow', 'lower': None, 'upper': None,
                          'between': ['tensorflow', 'pytorch']},
        # buffer_size is only relevant if 'val_data' is same and shuffle is true.
        # https://www.tensorflow.org/api_docs/python/tf/data/Dataset#shuffle
        # It is used to shuffle tf.Dataset of training data.
        'buffer_size': {'type': int, 'default': 100, 'lower': None, 'upper': None, 'between': None},
        # comes handy if we want to skip certain batches from last
        'batches_per_epoch': {"type": int, "default": None, 'lower': None, 'upper': None, 'between': None},
        # https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit
        'steps_per_epoch': {"type": int, "default": None, 'lower': None, 'upper': None, 'between': None},
        # can be string or list of strings such as 'mse', 'kge', 'nse', 'pbias'
        'monitor': {"type": [list, type(None), str], "default": None, 'lower': None, 'upper': None, 'between': None},
        # todo, is it  redundant?
        # If the model takes one kind of input_features that is it consists of
        # only 1 Input layer, then the shape of the batches
        # will be inferred from the Input layer but for cases, the model takes
        # more than 1 Input, then there can be two
        # cases, either all the input_features are of same shape or they
        # are not. In second case, we should overwrite `train_paras`
        # method. In former case, define whether the batches are 2d or 3d. 3d
        # means it is for an LSTM and 2d means it is
        # for Dense layer.
        'batches': {"type": str, "default": '3d', 'lower': None, 'upper': None, 'between': ["2d", "3d"]},
        'prefix': {"type": str, "default": None, 'lower': None, 'upper': None, 'between': None},
        'path': {"type": str, "default": None, 'lower': None, 'upper': None, 'between': None},
        'kmodel': {'type': None, "default": None, 'lower': None, 'upper': None, 'between': None},
        'cross_validator': {'default': None, 'between': ['LeaveOneOut', 'kfold']},
        'wandb_config': {'type': dict, 'default': None, 'between': None},
        'val_metric': {'type': str, 'default': None},
        'model_name_': {'default': None},
        'is_custom_model_': {"default": None},
    }

    data_args = {
        # if the shape of last batch is smaller than batch size and if we
        # want to skip this last batch, set following to True.
        # Useful if we have fixed batch size in our model but the number of samples is not fully divisble by batch size
        'drop_remainder': {"type": bool, "default": False, 'lower': None, 'upper': None, 'between': [True, False]},
        'category': {'type': str, 'default': def_cat, 'lower': None, 'upper': None, 'between': ["ML", "DL"]},
        'mode': {'type': str, 'default': None, 'lower': None, 'upper': None,
                 'between': ["regression", "classification"]},
        'batch_size':        {"type": int,   "default": 32, 'lower': None, 'upper': None, 'between': None},
        'split_random': {'type': bool, 'default': False, 'between': [True, False]},
        # fraction of data to be used for validation
        'val_fraction':      {"type": float, "default": 0.2, 'lower': None, 'upper': None, 'between': None},
        # the following argument can be set to 'same' for cases if you want to use same data as validation as well as
        # test data. If it is 'same', then same fraction/amount of data will be used for validation and test.
        # If this is not string and not None, this will overwite `val_fraction`
        'indices':          {"type": dict,  "default": None, 'lower': None, 'upper': None, 'between': ["same", None]},
        # fraction of data to be used for test
        'train_fraction':     {"type": float, "default": 0.7, 'lower': None, 'upper': None, 'between': None},
        # write the data/batches as hdf5 file
        'save':        {"type": bool,  "default": False, 'lower': None, 'upper': None, 'between': None},
        'allow_nan_labels':       {"type": int,  "default": 0, 'lower': 0, 'upper': 2, 'between': None},

        'nan_filler':        {"type": None, "default": None, "lower": None, "upper": None, "between": None},

        # for reproducability
        'seed':              {"type": None, "default": 313, 'lower': None, 'upper': None, 'between': None},
        # input features in data_frame
        'input_features':            {"type": None, "default": None, 'lower': None, 'upper': None, 'between': None},
        # column in dataframe to bse used as output/target
        'output_features':           {"type": None, "default": None, 'lower': None, 'upper': None, 'between': None},
        # tuple of tuples where each tuple consits of two integers, marking the start and end
        # of interval. An interval here
        # means chunk/rows from the input file/dataframe to be skipped when when preparing
        # data/batches for NN. This happens
        # when we have for example some missing values at some time in our data.
        # For further usage see `examples/using_intervals`
        "intervals":         {"type": None, "default": None, 'lower': None, 'upper': None, 'between': None},
        'verbosity':         {"type": int, "default": 1, 'lower': None, 'upper': None, 'between': None},
        'teacher_forcing': {'type': bool, 'default': False, 'lower': None, 'upper': None, 'between': [True, False]},
        'dataset_args': {'type': dict, 'default': {}},
        'ts_args': {"type": dict, "default": {'lookback': 1,
                                              'forecast_len': 1,
                                              'forecast_step': 0,
                                              'known_future_inputs': False,
                                              'input_steps': 1,
                                              'output_steps': 1}}
    }

    model_config = {key: val['default'] for key, val in model_args.items()}

    config = {key: val['default'] for key, val in data_args.items()}

    opt_paras = {}
    # because there are two kinds of hpos which can be optimized
    # some can be in model config and others are in main config
    original_other_conf = {}
    original_mod_conf = {}

    for key, val in kwargs.items():
        arg_name = key.lower()  # todo, why this?

        if val.__class__.__name__ in ['Integer', "Real", "Categorical"]:
            opt_paras[key] = val
            val2 = val
            val = jsonize(val.rvs(1)[0])

            val2.name = key
            original_other_conf[key] = val2

        if key == 'model':
            val, _opt_paras, original_mod_conf = find_opt_paras_from_model_config(val)
            opt_paras.update(_opt_paras)

        if key == 'ts_args':
            val, _opt_paras = find_opt_paras_from_ts_args(val)
            opt_paras.update(_opt_paras)

        if arg_name in model_config:
            update_dict(arg_name, val, model_args, model_config)

        elif arg_name in config:
            update_dict(arg_name, val, data_args, config)

        elif arg_name in ['x_transformer_', 'y_transformer_']:
            config[arg_name] = val

        # config may contain additional user defined args which will not be checked
        elif not accept_additional_args:
            raise ValueError(f"Unknown keyworkd argument '{key}' provided")
        else:
            config[key] = val

    if config['allow_nan_labels'] > 0:
        assert 'layers' in model_config['model'], f"""
The model appears to be deep learning based because 
the argument `model` does not have layers. But you are
allowing nan labels in the targets.
However, `allow_nan_labels` should be > 0 only for deep learning models
"""

    config.update(model_config)

    if isinstance(config['input_features'], dict):
        for data in [config['input_features'], config['output_features']]:
            for k, v in data.items():
                assert isinstance(v, list), f"""
                {k} is of type {v.__class__.__name__} but it must of of type list
                {k}: {v}"""

    _data_config = {}
    for key, val in config.items():
        if key in data_args:
            _data_config[key] = val

    config['model_name_'] = model_name
    config['is_custom_model_'] = is_custom_model

    return config, _data_config, opt_paras, {'model': original_mod_conf, 'other': original_other_conf}


def update_dict(key, val, dict_to_lookup, dict_to_update):
    """Updates the dictionary with key, val if the val is of type dtype."""
    dtype = dict_to_lookup[key].get('type', None)
    low = dict_to_lookup[key].get('lower', None)
    up = dict_to_lookup[key].get('upper', None)
    between = dict_to_lookup[key].get('between', None)

    if dtype is not None:
        if isinstance(dtype, list):
            val_type = type(val)
            if 'callable' in dtype:
                if callable(val):
                    pass

            elif val_type not in dtype:
                raise TypeError("{} must be any of the type {} but it is of type {}"
                                .format(key, dtype, val.__class__.__name__))
        elif not isinstance(val, dtype):
            # the default value may be None which will be different than dtype
            if val != dict_to_lookup[key]['default']:
                raise TypeError(f"""
                {key} must be of type {dtype} but it is of type {val.__class__.__name__} 
                {key}: {val}
                """)

    if isinstance(val, (int, float)):
        if low is not None:
            if val < low:
                raise ValueError(f"""
                The value '{val}' for '{key}' must be greater than '{low}'""")
        if up is not None:
            if val > up:
                raise ValueError(f"""
                The value '{val} for '{key} must be less than '{up}'""")

    if isinstance(val, str):
        if between is not None:
            if val not in between:
                raise ValueError(f"""
                Unknown value '{val}' for '{key}'. It must be one of '{between}'""")

    dict_to_update[key] = val
    return


def deepcopy_dict_without_clone(d: dict) -> dict:
    """makes deepcopy of a dictionary without cloning it"""
    assert isinstance(d, dict)

    new_d = {}
    for k, v in d.items():
        if isinstance(v, dict):
            new_d[k] = deepcopy_dict_without_clone(v)
        elif hasattr(v, '__len__'):
            new_d[k] = v[:]
        else:
            new_d[k] = copy.copy(v)
    return new_d


def find_opt_paras_from_ts_args(ts_args:dict)->tuple:
    opt_paras = {}
    new_ts_args = {'lookback': 15,
                   'forecast_len': 1,
                   'forecast_step': 0,
                   'known_future_inputs': False,
                   'input_steps': 1,
                   'output_steps': 1}
    new_ts_args.update(ts_args)

    for k,v in ts_args.items():

        if v.__class__.__name__ in ['Integer', 'Real', 'Categorical']:
            if v.name is None or v.name.startswith("integer_") or v.name.startswith("real_"):
                v.name = k
            opt_paras[k] = v
            v = v.rvs(1)[0]
            new_ts_args[k] = v

    return new_ts_args, opt_paras

def find_opt_paras_from_model_config(
        config: Union[dict, str, None]
) -> Tuple[Union[dict, None, str], dict, Union[dict, str, None]]:

    opt_paras = {}

    if config is None or isinstance(config, str):
        return config, opt_paras, config

    assert isinstance(config, dict) and len(config) == 1

    if 'layers' in config:
        original_model_config, _ = process_config_dict(
            deepcopy_dict_without_clone(config['layers']), False)

        # it is a nn based model
        new_lyrs_config, opt_paras = process_config_dict(config['layers'])
        new_model_config = {'layers': new_lyrs_config}

    else:
        # it is a classical ml model
        _ml_config = {}
        ml_config: dict = list(config.values())[0]
        model_name = list(config.keys())[0]

        original_model_config, _ = process_config_dict(
            copy.deepcopy(config[model_name]), False)

        for k, v in ml_config.items():

            if v.__class__.__name__ in ['Integer', 'Real', 'Categorical']:

                if v.name is None or v.name.startswith("integer_") or v.name.startswith("real_"):
                    v.name = k
                opt_paras[k] = v
                v = v.rvs(1)[0]

            _ml_config[k] = v
        val = _ml_config
        new_model_config = {model_name: val}

    return new_model_config, opt_paras, original_model_config


def process_config_dict(config_dict: dict, update_initial_guess=True):
    """From a dicitonary defining structure of neural networks, this function
    finds out which are hyperparameters from them"""

    assert isinstance(config_dict, dict)

    opt_paras = {}

    def pd(d):
        for k, v in d.items():
            if isinstance(v, dict) and len(v) > 0:
                pd(v)
            elif v.__class__.__name__ in ["Integer", "Real", "Categorical"]:
                if v.name is None or v.name.startswith("integer_") or v.name.startswith("real_"):
                    v.name = k

                if v.name in opt_paras:
                    raise ValueError(f"""
        Hyperparameter with duplicate name {v.name} found. A hyperparameter to be
        optimized with name '{v.name}' already exists""")

                opt_paras[v.name] = v
                if update_initial_guess:
                    x0 = jsonize(v.rvs(1)[0])  # get initial guess
                    d[k] = x0  # inplace change of dictionary
                else:
                # we most probably have updated the name, so doing inplace change
                    d[k] = v
        return

    pd(config_dict)
    return config_dict, opt_paras


def update_model_config(config: dict, suggestions:dict)->dict:
    """returns the updated config if config contains any parameter from
    suggestions."""
    cc = copy.deepcopy(config)

    def update(c):
        for k, v in c.items():
            if isinstance(v, dict):
                update(v)
            elif v.__class__.__name__ in ["Integer", "Real", "Categorical"]:
                c[k] = suggestions[v.name]
        return

    update(cc)

    return cc


def to_datetime_index(idx_array, fmt='%Y%m%d%H%M') -> pd.DatetimeIndex:
    """ converts a numpy 1d array into pandas DatetimeIndex type."""

    if not isinstance(idx_array, np.ndarray):
        raise TypeError

    idx = pd.to_datetime(idx_array.astype(str), format=fmt)
    idx.freq = pd.infer_freq(idx)
    return idx


[docs]def jsonize( obj, type_converters:dict=None ): """ Serializes an object to python's native types so that it can be saved in json file format. If the object is a sequence, then each member of th sequence is serialized. Same goes for nested sequences like lists of lists or list of dictionaries. Parameters ---------- obj : any python object that needs to be serialized. type_converters : dict a dictionary definiting how to serialize any particular type The keys of the dictionary should be ``type`` the the values should be callable to serialize that type. Return ------ a serialized python object Examples -------- >>> import numpy as np >>> from ai4water.utils import jsonize >>> a = np.array([2.0]) >>> b = jsonize(a) >>> type(b) # int ... # if a data container consists of mix of native and third party types ... # only third party types are converted into native types >>> print(jsonize({1: [1, None, True, np.array(3)], 'b': np.array([1, 3])})) ... {1: [1, None, True, 3], 'b': [1, 2, 3]} The user can define the methods to serialize some types e. g., we can serialize tensorflow's tensors using serialize method >>> from tensorflow.keras.layers import Lambda, serialize >>> tensor = Lambda(lambda _x: _x[Ellipsis, -1, :]) >>> jsonize({'my_tensor': tensor}, {Lambda: serialize}) """ # boolean type if isinstance(obj, bool): return obj if 'int' in obj.__class__.__name__: return int(obj) if 'float' in obj.__class__.__name__: return float(obj) if isinstance(obj, dict): return {jsonize(k, type_converters): jsonize(v, type_converters) for k, v in obj.items()} if isinstance(obj, tuple): return tuple([jsonize(val, type_converters) for val in obj]) if obj.__class__.__name__ == 'NoneType': return obj # if obj is a python 'type' such as jsonize(list) if type(obj).__name__ == type.__name__: return obj.__name__ if hasattr(obj, '__len__') and not isinstance(obj, str): if hasattr(obj, 'shape') and len(obj.shape) == 0: # for cases such as np.array(1) return jsonize(obj.item(), type_converters) if obj.__class__.__name__ in ['Series', 'DataFrame']: # simple list comprehension will iterate over only column names # if we simply do jsonize(obj.values()), it will not save column names return {jsonize(k, type_converters): jsonize(v, type_converters) for k,v in obj.items()} return [jsonize(val, type_converters) for val in obj] if callable(obj): if isinstance(obj, FunctionType): return obj.__name__ if hasattr(obj, '__package__'): return obj.__package__ if isinstance(obj, collections_abc.Mapping): return dict(obj) if obj is Ellipsis: return {'class_name': '__ellipsis__'} if wrapt and isinstance(obj, wrapt.ObjectProxy): return obj.__wrapped__ if type_converters: for _type, converter in type_converters.items(): if isinstance(obj, _type): return converter(obj) # last resort, call the __str__ method of object on it return str(obj)
def make_hpo_results(opt_dir, metric_name='val_loss') -> dict: """Looks in opt_dir and saves the min val_loss with the folder name""" results = {} for folder in os.listdir(opt_dir): fname = os.path.join(os.path.join(opt_dir, folder), 'losses.csv') if os.path.exists(fname): df = pd.read_csv(fname) if 'val_loss' in df: min_val_loss = round(float(np.nanmin(df[metric_name])), 6) results[min_val_loss] = {'folder': os.path.basename(folder)} return results def find_best_weight(w_path: str, best: str = "min", ext: str = ".hdf5", epoch_identifier: int = None): """ Given weights in w_path, find the best weight. if epoch_identifier is given, it will be given priority to find best_weights The file_names are supposed in following format FileName_Epoch_Error.ext Note: if we are monitoring more than two metrics whose desired behaviour is opposite to each other then this method does not work as desired. However this can be avoided by specifying `epoch_identifier`. """ assert best in ['min', 'max'] all_weights = os.listdir(w_path) if len(all_weights) == 1: return all_weights[0] losses = {} for w in all_weights: wname = w.split(ext)[0] try: # converting to float so that trailing 0 is removed val_loss = str(float(wname.split('_')[2])) except (ValueError, IndexError) as e: raise ValueError(f"while trying to find best weight in {w_path} with {best} and" f" {ext} and {epoch_identifier} wname: {wname}" f" encountered following error \n{e}") losses[val_loss] = {'loss': wname.split('_')[2], 'epoch': wname.split('_')[1]} best_weight = None if epoch_identifier: for v in losses.values(): if str(epoch_identifier) in v['epoch']: best_weight = f"weights_{v['epoch']}_{v['loss']}.hdf5" break else: loss_array = np.array([float(l) for l in losses.keys()]) if len(loss_array) == 0: return None best_loss = getattr(np, best)(loss_array) best_weight = f"weights_{losses[str(best_loss)]['epoch']}_{losses[str(best_loss)]['loss']}.hdf5" return best_weight def add_folder(opt_dir: str, results: dict)->Union[dict, None]: folders = [file for file in os.listdir(opt_dir) if os.path.isdir(os.path.join(opt_dir, file))] num_folders = len(folders) results_with_folders = results.copy() if num_folders != len(results): warnings.warn(f"{num_folders} is not equal to {len(results)} so can not perform ranking") return for idx, (k, v) in enumerate(results.items()): v['folder'] = folders[idx] results_with_folders[k] = v return results_with_folders def remove_all_but_best_weights(w_path, best: str = "min", ext: str = ".hdf5"): """removes all the weights from a folder except the best weigtht""" best_weights = None if os.path.exists(w_path): # remove all but best weight all_weights = os.listdir(w_path) best_weights = find_best_weight(w_path, best=best, ext=ext) ws_to_del = [w for w in all_weights if w != best_weights] for w in ws_to_del: os.remove(os.path.join(w_path, w)) return best_weights def clear_weights( opt_dir:str, results: dict, keep:int = None, rename:bool = True, write:bool = True ): """Optimization will save weights of all the trained models, not all of them are useful. Here removing weights of all except top 10%. The number of models whose weights to be retained can be set by `keep` para. """ # each value of results is a dictionary which will have 'folders' key/value # pair added to it, original results dictionary should not be modified. results = {k:v.copy() for k,v in results.items()} if 'folder' not in list(results.items())[0]: results = add_folder(opt_dir, results) if results is None: return if keep is None: keep = int(len(results) * 0.1) keep = max(keep, 3) fname = 'sorted.json' d = {k: v['y'] for k, v in results.items()} sorted_iters: list = sorted(d, key=d.get) # sort a results based on a sorted_iters results = dict(sorted(results.items(), key=lambda pair: sorted_iters.index(pair[0]))) best_results = {} for idx, v in enumerate(results.values()): folder = v['folder'] _path = os.path.join(opt_dir, folder) w_path = os.path.join(_path, 'weights') if idx > keep-1: if os.path.exists(w_path): rmtree(w_path) else: best_weights = remove_all_but_best_weights(w_path) best_results[folder] = {'path': _path, 'weights': best_weights} if rename: rank_folders(opt_dir, results, best_results) results = {k: jsonize(v) for k, v in results.items()} if write: sorted_fname = os.path.join(opt_dir, fname) with open(sorted_fname, 'w') as sfp: json.dump(results, sfp, sort_keys=True, indent=True) return best_results def rank_folders(opt_dir, results, best_results): # append ranking of models to folder_names for idx, v in enumerate(results.values()): folder = v['folder'] old_path = os.path.join(opt_dir, folder) new_path = os.path.join(opt_dir, str(idx + 1) + "_" + folder) os.rename(old_path, new_path) if folder in best_results: best_results[folder] = {'path': new_path, 'weights': best_results[folder]} return
[docs]class TrainTestSplit(object): """ train_test_split of sklearn can not be used for list of arrays so here we go Examples --------- >>> import numpy as np >>> from ai4water.utils.utils import TrainTestSplit >>> x1 = np.random.random((100, 10, 4)) >>> x2 = np.random.random((100, 4)) >>> x = [x1, x2] >>> y = np.random.random(100) ... >>> train_x, test_x, train_y, test_y = TrainTestSplit().split_by_random(x, y) >>> # works as well when only a single array i.e. is provided >>> train_x, test_x, _, _ = TrainTestSplit().split_by_random(x) ... # if we have a time-series like data, where we want to use earlier samples ... # for training and later samples for test then we can do slice based >>> train_x, test_x, train_y, test_y = TrainTestSplit().split_by_slicing(x, y) """ def __init__( self, test_fraction: float = 0.3, seed : int = None, train_indices: Union[list, np.ndarray] = None, test_indices: Union[list, np.ndarray] = None ): """ test_fraction: test fraction. Must be greater than 0. and less than 1. seed: random seed for reproducibility """ self.test_fraction = test_fraction self.random_state = np.random.RandomState(seed=seed) self.train_indices = train_indices self.test_indices = test_indices
[docs] def split_by_slicing( self, x: Union[list, np.ndarray, pd.Series, pd.DataFrame, List[np.ndarray]], y: Union[list, np.ndarray, pd.Series, pd.DataFrame, List[np.ndarray]]=None, ): """splits the x and y by slicing which is defined by `test_fraction` Arguments: x: arrays to split - array like such as list, numpy array or pandas dataframe/series - list of array like objects y: array like - array like such as list, numpy array or pandas dataframe/series - list of array like objects """ def split_arrays(array): if isinstance(array, list): # x is list of arrays # assert that all arrays are of equal length assert len(set([len(_array) for _array in array])) == 1, f"arrays are of not same length" split_at = int(array[0].shape[0] * (1. - self.test_fraction)) else: split_at = int(len(array) * (1. - self.test_fraction)) train, test = (self.slice_arrays(array, 0, split_at), self.slice_arrays(array, split_at)) return train, test train_x, test_x = split_arrays(x) if y is not None: train_y, test_y = split_arrays(y) else: train_y, test_y = [], [] return train_x, test_x, train_y, test_y
[docs] def split_by_random( self, x: Union[list, np.ndarray, pd.Series, pd.DataFrame, List[np.ndarray]], y: Union[list, np.ndarray, pd.Series, pd.DataFrame, List[np.ndarray]]=None, )->Tuple[Any, Any, Any, Any]: """ splits the x and y by random splitting. Arguments: x: arrays to split - array like such as list, numpy array or pandas dataframe/series - list of array like objects y: array like - array like such as list, numpy array or pandas dataframe/series - list of array like objects """ if isinstance(x, list): indices = np.arange(len(x[0])) else: indices = np.arange(len(x)) indices = self.random_state.permutation(indices) split_at = int(len(indices) * (1. - self.test_fraction)) train_indices, test_indices = (self.slice_arrays(indices, 0, split_at), self.slice_arrays(indices, split_at)) train_x = self.slice_with_indices(x, train_indices) train_y = self.slice_with_indices(y, train_indices) test_x = self.slice_with_indices(x, test_indices) test_y = self.slice_with_indices(y, test_indices) return train_x, test_x, train_y, test_y
[docs] def split_by_indices( self, x: Union[list, np.ndarray, pd.Series, pd.DataFrame, List[np.ndarray]], y: Union[list, np.ndarray, pd.Series, pd.DataFrame, List[np.ndarray]]=None, ): """splits the x and y by user defined `train_indices` and `test_indices`""" return self.slice_with_indices(x, self.train_indices), \ self.slice_with_indices(x, self.test_indices), \ self.slice_with_indices(y, self.train_indices), \ self.slice_with_indices(y, self.test_indices)
@staticmethod def slice_with_indices(array, indices): if array is None: return [] if isinstance(array, list): data = [] for d in array: if isinstance(d, (pd.Series, pd.DataFrame)): data.append(d.iloc[indices]) else: assert isinstance(d, (np.ndarray, pd.DatetimeIndex)) data.append(d[indices]) else: if isinstance(array, (pd.DataFrame, pd.Series)): data = array.iloc[indices] else: assert isinstance(array, (np.ndarray, pd.DatetimeIndex)) data = array[indices] return data @staticmethod def slice_arrays(arrays, start, stop=None): if isinstance(arrays, list): return [array[start:stop] for array in arrays] elif hasattr(arrays, 'shape'): return arrays[start:stop] def KFold( self, x, y, n_splits, shuffle=True, **kwargs ): from sklearn.model_selection import KFold kf = KFold(n_splits=n_splits, random_state=self.random_state, shuffle=shuffle) spliter = kf.split(x[0] if isinstance(x, list) else x) return self.yield_splits(x, y, spliter) @staticmethod def yield_splits(x, y, spliter): for tr_idx, test_idx in spliter: if isinstance(x, list): train_x = [xarray[tr_idx] for xarray in x] test_x = [xarray[test_idx] for xarray in x] else: train_x = x[tr_idx] test_x = x[test_idx] if isinstance(y, list): train_y = [yarray[tr_idx] for yarray in y] test_y = [yarray[test_idx] for yarray in y] else: train_y = y[tr_idx] test_y = y[test_idx] yield (train_x, train_y), (test_x, test_y) def TimeSeriesSplit( self, x, y, **kwargs ): from sklearn.model_selection import TimeSeriesSplit kf = TimeSeriesSplit(**kwargs) spliter = kf.split(x[0] if isinstance(x, list) else x) return self.yield_splits(x, y, spliter) def ShuffleSplit( self, x, y, *args, **kwargs ): from sklearn.model_selection import ShuffleSplit kf = ShuffleSplit(*args, **kwargs) spliter = kf.split(x[0] if isinstance(x, list) else x) return self.yield_splits(x, y, spliter) def LeaveOneOut( self, x, y, **kwargs ): from sklearn.model_selection import LeaveOneOut kf = LeaveOneOut() spliter = kf.split(x[0] if isinstance(x, list) else x) return self.yield_splits(x, y, spliter)
def ts_features(data: Union[np.ndarray, pd.DataFrame, pd.Series], precision: int = 3, name: str = '', st: int = 0, en: int = None, features: Union[list, str] = None ) -> dict: """ Extracts features from 1d time series data. Features can be * point, one integer or float point value for example mean * 1D, 1D array for example sin(data) * 2D, 2D array for example wavelent transform Arguments: Gets all the possible stats about an array like object `data`. data: array like precision: number of significant figures name: str, only for erro or warning messages st: str/int, starting index of data to be considered. en: str/int, end index of data to be considered. features: name/names of features to extract from data. # information holding degree """ point_features = { 'Skew': skew, 'Kurtosis': kurtosis, 'Mean': np.nanmean, 'Geometric Mean': gmean, 'Standard error of mean': scipy.stats.sem, 'Median': np.nanmedian, 'Variance': np.nanvar, 'Coefficient of Variation': variation, 'Std': np.nanstd, 'Non Zeros': np.count_nonzero, 'Min': np.nanmin, 'Max': np.nanmax, 'Sum': np.nansum, 'Counts': np.size } point_features_lambda = { 'Shannon entropy': lambda x: np.round(scipy.stats.entropy(pd.Series(x).value_counts()), precision), 'Negative counts': lambda x: int(np.sum(x < 0.0)), '90th percentile': lambda x: np.round(np.nanpercentile(x, 90), precision), '75th percentile': lambda x: np.round(np.nanpercentile(x, 75), precision), '50th percentile': lambda x: np.round(np.nanpercentile(x, 50), precision), '25th percentile': lambda x: np.round(np.nanpercentile(x, 25), precision), '10th percentile': lambda x: np.round(np.nanpercentile(x, 10), precision), } if not isinstance(data, np.ndarray): if hasattr(data, '__len__'): data = np.array(data) else: raise TypeError(f"{name} must be array like but it is of type {data.__class__.__name__}") if np.array(data).dtype.type is np.str_: warnings.warn(f"{name} contains string values") return {} if 'int' not in data.dtype.name: if 'float' not in data.dtype.name: warnings.warn(f"changing the dtype of {name} from {data.dtype.name} to float") data = data.astype(np.float64) assert data.size == len(data), f""" data must be 1 dimensional array but it has shape {np.shape(data)} """ data = data[st:en] stats = dict() if features is None: features = list(point_features.keys()) + list(point_features_lambda.keys()) elif isinstance(features, str): features = [features] for feat in features: if feat in point_features: stats[feat] = np.round(point_features[feat](data), precision) elif feat in point_features_lambda: stats[feat] = point_features_lambda[feat](data) if 'Harmonic Mean' in features: try: stats['Harmonic Mean'] = np.round(hmean(data), precision) except ValueError: warnings.warn(f"""Unable to calculate Harmonic mean for {name}. Harmonic mean only defined if all elements are greater than or equal to zero""", UserWarning) return jsonize(stats)
[docs]def prepare_data( data: np.ndarray, lookback: int, num_inputs: int = None, num_outputs: int = None, input_steps: int = 1, forecast_step: int = 0, forecast_len: int = 1, known_future_inputs: bool = False, output_steps: int = 1, mask: Union[int, float, np.ndarray] = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ converts a numpy nd array into a supervised machine learning problem. Parameters ---------- data : nd numpy array whose first dimension represents the number of examples and the second dimension represents the number of features. Some of those features will be used as inputs and some will be considered as outputs depending upon the values of `num_inputs` and `num_outputs`. lookback : number of previous steps/values to be used at one step. num_inputs : default None, number of input features in data. If None, it will be calculated as features-outputs. The input data will be all from start till num_outputs in second dimension. num_outputs : number of columns (from last) in data to be used as output. If None, it will be caculated as features-inputs. input_steps: strides/number of steps in input data forecast_step : must be greater than equal to 0, which t+ith value to use as target where i is the horizon. For time series prediction, we can say, which horizon to predict. forecast_len : number of horizons/future values to predict. known_future_inputs : Only useful if `forecast_len`>1. If True, this means, we know and use 'future inputs' while making predictions at t>0 output_steps : step size in outputs. If =2, it means we want to predict every second value from the targets mask : If int, then the examples with these values in the output will be skipped. If array then it must be a boolean mask indicating which examples to include/exclude. The length of mask should be equal to the number of generated examples. The number of generated examples is difficult to prognose because it depend upon lookback, input_steps, and forecast_step. Thus it is better to provide an integer indicating which values in outputs are to be considered as invalid. Default is None, which indicates all the generated examples will be returned. Returns ------- x : numpy array of shape (examples, lookback, ins) consisting of input examples prev_y : numpy array consisting of previous outputs y : numpy array consisting of target values Given following data consisting of input/output pairs +--------+--------+---------+---------+----------+ | input1 | input2 | output1 | output2 | output 3 | +========+========+=========+=========+==========+ | 1 | 11 | 21 | 31 | 41 | +--------+--------+---------+---------+----------+ | 2 | 12 | 22 | 32 | 42 | +--------+--------+---------+---------+----------+ | 3 | 13 | 23 | 33 | 43 | +--------+--------+---------+---------+----------+ | 4 | 14 | 24 | 34 | 44 | +--------+--------+---------+---------+----------+ | 5 | 15 | 25 | 35 | 45 | +--------+--------+---------+---------+----------+ | 6 | 16 | 26 | 36 | 46 | +--------+--------+---------+---------+----------+ | 7 | 17 | 27 | 37 | 47 | +--------+--------+---------+---------+----------+ If we use following 2 time series as input +--------+--------+ | input1 | input2 | +========+========+ | 1 | 11 | +--------+--------+ | 2 | 12 | +--------+--------+ | 3 | 13 | +--------+--------+ | 4 | 14 | +--------+--------+ | 5 | 15 | +--------+--------+ | 6 | 16 | +--------+--------+ | 7 | 17 | +--------+--------+ then ``num_inputs`` =2, ``lookback`` =7, ``input_steps`` =1 and if we want to predict +---------+---------+----------+ | output1 | output2 | output 3 | +=========+=========+==========+ | 27 | 37 | 47 | +---------+---------+----------+ then ``num_outputs`` =3, ``forecast_len`` =1, ``forecast_step`` =0, if we want to predict +---------+---------+----------+ | output1 | output2 | output 3 | +=========+=========+==========+ | 28 | 38 | 48 | +---------+---------+----------+ then ``num_outputs`` =3, ``forecast_len`` =1, ``forecast_step`` =1, if we want to predict +---------+---------+----------+ | output1 | output2 | output 3 | +=========+=========+==========+ | 27 | 37 | 47 | +---------+---------+----------+ | 28 | 38 | 48 | +---------+---------+----------+ then ``num_outputs`` =3, ``forecast_len`` =2, horizon/forecast_step=0, if we want to predict +---------+---------+----------+ | output1 | output2 | output 3 | +=========+=========+==========+ | 28 | 38 | 48 | +---------+---------+----------+ | 29 | 39 | 49 | +---------+---------+----------+ | 30 | 40 | 50 | +---------+---------+----------+ then ``num_outputs`` =3, ``forecast_len`` =3, ``forecast_step`` =1, if we want to predict +---------+ | output2 | +=========+ | 38 | +---------+ | 39 | +---------+ | 40 | +---------+ then ``num_outputs`` =1, ``forecast_len`` =3, ``forecast_step`` =0 if we predict +---------+ | output2 | +=========+ | 39 | +---------+ then ``num_outputs`` =1, ``forecast_len`` =1, ``forecast_step`` =2 if we predict +---------+ | output2 | +=========+ | 39 | +---------+ | 40 | +---------+ | 41 | +---------+ then ``num_outputs`` =1, ``forecast_len`` =3, ``forecast_step`` =2 If we use following two time series as input +--------+--------+ |input1 | input2 | +========+========+ | 1 | 11 | +--------+--------+ | 3 | 13 | +--------+--------+ | 5 | 15 | +--------+--------+ | 7 | 17 | +--------+--------+ then ``num_inputs`` =2, ``lookback`` =4, ``input_steps`` =2 If the input is +--------+--------+ | input1 | input2 | +========+========+ | 1 | 11 | +--------+--------+ | 2 | 12 | +--------+--------+ | 3 | 13 | +--------+--------+ | 4 | 14 | +--------+--------+ | 5 | 15 | +--------+--------+ | 6 | 16 | +--------+--------+ | 7 | 17 | +--------+--------+ and target/output is +---------+---------+----------+ | output1 | output2 | output 3 | +=========+=========+==========+ | 25 | 35 | 45 | +---------+---------+----------+ | 26 | 36 | 46 | +---------+---------+----------+ | 27 | 37 | 47 | +---------+---------+----------+ This means we make use of ``known future inputs``. This can be achieved using following configuration num_inputs=2, num_outputs=3, lookback=4, forecast_len=3, forecast_step=1, known_future_inputs=True The general shape of output/target/label is (examples, num_outputs, forecast_len) The general shape of inputs/x is (examples, lookback + forecast_len-1, ....num_inputs) Examples: >>> import numpy as np >>> from ai4water.utils.utils import prepare_data >>> num_examples = 50 >>> dataframe = np.arange(int(num_examples*5)).reshape(-1, num_examples).transpose() >>> dataframe[0:10] array([[ 0, 50, 100, 150, 200], [ 1, 51, 101, 151, 201], [ 2, 52, 102, 152, 202], [ 3, 53, 103, 153, 203], [ 4, 54, 104, 154, 204], [ 5, 55, 105, 155, 205], [ 6, 56, 106, 156, 206], [ 7, 57, 107, 157, 207], [ 8, 58, 108, 158, 208], [ 9, 59, 109, 159, 209]]) >>> x, prevy, y = prepare_data(dataframe, num_outputs=2, lookback=4, ... input_steps=2, forecast_step=2, forecast_len=4) >>> x[0] array([[ 0., 50., 100.], [ 2., 52., 102.], [ 4., 54., 104.], [ 6., 56., 106.]], dtype=float32) >>> y[0] array([[158., 159., 160., 161.], [208., 209., 210., 211.]], dtype=float32) >>> x, prevy, y = prepare_data(dataframe, num_outputs=2, lookback=4, ... forecast_len=3, known_future_inputs=True) >>> x[0] array([[ 0, 50, 100], [ 1, 51, 101], [ 2, 52, 102], [ 3, 53, 103], [ 4, 54, 104], [ 5, 55, 105], [ 6, 56, 106]]) # (7, 3) >>> # it is important to note that although lookback=4 but x[0] has shape of 7 >>> y[0] array([[154., 155., 156.], [204., 205., 206.]], dtype=float32) # (2, 3) """ if not isinstance(data, np.ndarray): if isinstance(data, pd.DataFrame): data = data.values else: raise TypeError(f"unknown data type for data {data.__class__.__name__}") if num_inputs is None and num_outputs is None: raise ValueError(""" Either of num_inputs or num_outputs must be provided. """) features = data.shape[1] if num_outputs is None: num_outputs = features - num_inputs if num_inputs is None: num_inputs = features - num_outputs assert num_inputs + num_outputs == features, f""" num_inputs {num_inputs} + num_outputs {num_outputs} != total features {features}""" if len(data) <= 1: raise ValueError(f"Can not create batches from data with shape {data.shape}") time_steps = lookback if known_future_inputs: lookback = lookback + forecast_len assert forecast_len > 1, f""" known_futre_inputs should be True only when making predictions at multiple horizons i.e. when forecast length/number of horizons to predict is > 1. known_future_inputs: {known_future_inputs} forecast_len: {forecast_len}""" examples = len(data) x = [] prev_y = [] y = [] for i in range(examples - lookback * input_steps + 1 - forecast_step - forecast_len + 1): stx, enx = i, i + lookback * input_steps x_example = data[stx:enx:input_steps, 0:features - num_outputs] st, en = i, i + (lookback - 1) * input_steps y_data = data[st:en:input_steps, features - num_outputs:] sty = (i + time_steps * input_steps) + forecast_step - input_steps eny = sty + forecast_len target = data[sty:eny, features - num_outputs:] x.append(np.array(x_example)) prev_y.append(np.array(y_data)) y.append(np.array(target)) x = np.stack(x) prev_y = np.array([np.array(i, dtype=np.float32) for i in prev_y], dtype=np.float32) # transpose because we want labels to be of shape (examples, outs, forecast_len) y = np.array([np.array(i, dtype=np.float32).T for i in y], dtype=np.float32) if mask is not None: if isinstance(mask, np.ndarray): assert mask.ndim == 1 assert len(x) == len(mask), f"Number of generated examples are {len(x)} " \ f"but the length of mask is {len(mask)}" elif isinstance(mask, float) and np.isnan(mask): mask = np.invert(np.isnan(y)) mask = np.array([all(i.reshape(-1,)) for i in mask]) else: assert isinstance(mask, int), f""" Invalid mask identifier given of type: {mask.__class__.__name__}""" mask = y != mask mask = np.array([all(i.reshape(-1,)) for i in mask]) x = x[mask] prev_y = prev_y[mask] y = y[mask] return x, prev_y, y
def find_tot_plots(features, max_subplots): tot_plots = np.linspace(0, features, int(features / max_subplots) + 1 if features % max_subplots == 0 else int( features / max_subplots) + 2) # converting each value to int because linspace can return array containing floats if features is odd tot_plots = [int(i) for i in tot_plots] return tot_plots class JsonEncoder(json.JSONEncoder): def default(self, obj): if 'int' in obj.__class__.__name__: return int(obj) elif 'float' in obj.__class__.__name__: return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif 'bool' in obj.__class__.__name__: return bool(obj) elif callable(obj) and hasattr(obj, '__module__'): if isinstance(obj, FunctionType): return obj.__name__ else: return obj.__module__ else: return super(JsonEncoder, self).default(obj) def plot_activations_along_inputs( data: np.ndarray, activations: np.ndarray, observations: np.ndarray, predictions: np.ndarray, in_cols: list, out_cols: list, lookback: int, name: str, path: str, vmin=None, vmax=None, show=False ): # activation must be of shape (num_examples, lookback, input_features) assert activations.shape[1] == lookback assert activations.shape[2] == len(in_cols), f'{activations.shape}, {len(in_cols)}' # data is of shape (num_examples, input_features) assert data.shape[1] == len(in_cols) assert len(data) == len(activations) for out in range(len(out_cols)): pred = predictions[:, out] if observations is None: obs = None else: obs = observations[:, out] out_name = out_cols[out] for idx in range(len(in_cols)): plt.close('all') fig, (ax1, ax2, ax3) = plt.subplots(3, sharex='all') fig.set_figheight(12) ax1.plot(data[:, idx], label=in_cols[idx]) ax1.legend() ax1.set_title('activations w.r.t ' + in_cols[idx]) ax1.set_ylabel(in_cols[idx]) ax2.plot(pred, label='Prediction') if obs is not None: ax2.plot(obs, '.', label='Observed') ax2.legend() ytick_labels = [f"t-{int(i)}" for i in np.linspace(lookback - 1, 0, lookback)] im = imshow( activations[:, :, idx].transpose(), vmin=vmin, vmax=vmax, aspect="auto", ax = ax3, ax_kws=dict(xlabel="Examples", ylabel="lookback steps"), show=False, yticklabels=ytick_labels ) fig.colorbar(im, orientation='horizontal', pad=0.2) plt.subplots_adjust(wspace=0.005, hspace=0.005) _name = f'attn_weights_{out_name}_{name}_' plt.savefig(os.path.join(path, _name) + in_cols[idx], dpi=400, bbox_inches='tight') if show: plt.show() plt.close('all') return class DataNotFound(Exception): def __init__(self, source): self.source= source def __str__(self): return f""" Unable to get {self.source} data. You must specify the data either using 'x' or 'data' keywords.""" def print_something(something, prefix=''): """prints shape of some python object""" if hasattr(something, "shape"): print(f"{prefix} shape: ", something.shape) elif isinstance(something, list): print(f"{prefix} shape: ", [thing.shape for thing in something if hasattr(thing, "shape")]) elif isinstance(something, dict): print(f"{prefix} shape: ") pprint.pprint({k: v.shape for k, v in something.items() if hasattr(v, "shape")}, width=40) else: print(something) def maybe_three_outputs(data, teacher_forcing=False): """num_outputs: how many outputs from data we want""" if teacher_forcing: num_outputs = 3 else: num_outputs = 2 if num_outputs == 2: if len(data) == 2: return data[0], data[1] elif len(data) == 3: return data[0], data[2] else: if len(data)==3: return [data[0], data[1]], data[2] # DA, IA-LSTM models return [x,prevy],y even when teacher_forcing is on! return data def get_version_info( **kwargs ) -> dict: """returns version information of all the packages which are used by different modules of ai4water. """ import sys from ai4water.backend import lightgbm, tcn, catboost, xgboost, easy_mpl, SeqMetrics from ai4water.backend import tf, keras, torch from ai4water.backend import np, pd, mpl from ai4water.backend import h5py from ai4water.backend import sklearn, shapefile, xr, netCDF4 from ai4water.backend import optuna, skopt, hyperopt, plotly from ai4water.backend import fiona from ai4water.backend import lime, sns from ai4water import __version__ info = {'python': sys.version, 'os': os.name, 'ai4water': __version__} if kwargs.get('tf', None): tf = kwargs['tf'] info['tf_is_built_with_cuda'] = tf.test.is_built_with_cuda() info['is_built_with_gpu_support'] = tf.test.is_built_with_gpu_support() info['tf_is_gpu_available'] = tf.test.is_gpu_available() info['eager_execution'] = tf.executing_eagerly() for lib in [ lightgbm, tcn, catboost, xgboost, easy_mpl, SeqMetrics, tf, keras, torch, np, pd, mpl, h5py, sklearn, shapefile, fiona, xr, netCDF4, optuna, skopt, hyperopt, plotly, lime, sns]: if lib is not None: info[getattr(lib, '__name__')] = getattr(lib, '__version__', 'NotDefined') return info def check_attributes(model, attributes): for method in attributes: if not hasattr(model, method): raise ValueError(f"your custom class does not have {method}") def get_nrows_ncols(n_rows, n_subplots)->"tuple[int, int]": if n_rows is None: n_rows = int(np.sqrt(n_subplots)) n_cols = max(int(n_subplots / n_rows), 1) # ensure n_cols != 0 n_rows = int(n_subplots / n_cols) while not ((n_subplots / n_cols).is_integer() and (n_subplots / n_rows).is_integer()): n_cols -= 1 n_rows = int(n_subplots / n_cols) return n_rows, n_cols METRIC_TYPES = { "r2": "max", "nse": "max", "r2_score": "max", "kge": "max", 'log_nse': 'max', "corr_coeff": "max", 'accuracy': "max", 'f1_score': 'max', "mse": "min", "rmse": "min", "rmsle": "min", "mape": "min", "nrmse": "min", "pbias": "min", "bias": "min", "med_seq_error": "min", } class AttribtueSetter(object): def __init__(self, obj, y: np.ndarray, from_fit=None): if obj.mode is None: if 'float' in y.dtype.name: obj.mode = "regression" else: obj.mode = "regression" warnings.warn(f"inferred mode is {obj.mode}. Ignore this messare if the inferred mode is correct.") self.mode = obj.mode obj.classes_ = self.classes(y) # for sklearn obj.num_classes_ = len(obj.classes_) obj.is_binary_ = self.is_binary(y) outs = getattr(obj, 'output_features', '') or '' obj.is_multiclass_ = self.is_multiclass(y, outs) obj.is_multilabel_ = self.is_multilabel(outs) obj.is_fitted_ = from_fit return def is_multiclass(self, y, output_features='') -> bool: """Returns True if the porblem is multiclass classification""" _default = False if self.mode == 'classification': if len(output_features) <= 1: # also consider 0 bcz when when output_features is None/'', it will be 0 if len(self.classes(y)) > 2: _default = True # elif len(y) == y.size: # this means the names of # pass else: pass # todo, check when output columns are one-hot encoded return _default def is_multilabel(self, output_features='', ): if self.mode == "classification": if len(output_features) > 1: return True return False def classes(self, y: np.ndarray): if self.mode == "regression": return [] if len(y) != y.size: # nd array, one hot encoded return [i for i in range(y.shape[-1])] return list(np.unique(y[~np.isnan(y)])) def is_binary(self, y): if self.mode == "regression": return False if len(y) != y.size: # nd array, may be one hot encoded if y.shape[-1] == 2 and len(np.unique(y[~np.isnan(y)])) == 2: return True # binary, one hot encoded return False if len(np.unique(y[~np.isnan(y)])) == 2: return True return False def get_values(outputs): if isinstance(outputs, (dict, list)) and len(outputs) == 1: outputs = list(outputs.values())[0] return outputs def create_subplots(*args, **kwargs): try: from pandas.plotting._matplotlib.tools import create_subplots except ImportError: # for older pandas versions from pandas.plotting._matplotlib.tools import _subplots as create_subplots return create_subplots(*args, **kwargs) def mad(*args, **kwargs): try: from scipy.stats import median_abs_deviation as _mad except ImportError: from scipy.stats import median_absolute_deviation as _mad return _mad(*args, **kwargs)