import json
import inspect
import warnings
from typing import Union
from copy import copy, deepcopy
import ai4water.datasets as datasets
from ai4water.datasets import all_datasets
from ai4water.utils.utils import TrainTestSplit
from ai4water.utils.plotting_tools import Plots
from ai4water.preprocessing.imputation import Imputation
from ai4water.utils.utils import prepare_data, jsonize, to_datetime_index, print_something
from ai4water.backend import np, pd, plt, os, mpl, sklearn, h5py
from .utils import check_for_classification
from .utils import consider_intervals, decode
from .utils import load_data_from_hdf5
train_test_split = sklearn.model_selection.train_test_split
KFold = sklearn.model_selection.KFold
LeaveOneOut = sklearn.model_selection.LeaveOneOut
TimeSeriesSplit = sklearn.model_selection.TimeSeriesSplit
ShuffleSplit = sklearn.model_selection.ShuffleSplit
Patch = mpl.patches.Patch
cmap_cv = plt.cm.coolwarm
class _DataSet(Plots):
def __init__(self, config, path=os.getcwd()):
Plots.__init__(self, config=config, path=path)
def training_data(self):
raise NotImplementedError
def validation_data(self):
raise NotImplementedError
def test_data(self):
raise NotImplementedError
def KFold_splits(self, n_splits=5):
raise NotImplementedError
def LeaveOneOut_splits(self):
raise NotImplementedError
def TimeSeriesSplit_splits(self, n_splits=5):
raise NotImplementedError
@classmethod
def from_h5(cls, h5_file: str):
raise NotImplementedError
def to_disk(self, path: str):
raise NotImplementedError
def return_xy(self, x, y, initial):
if self.mode == "classification" and self.is_binary:
if len(y) == y.size:
y = y.reshape(-1, 1)
if self.verbosity > 0:
print(f"{'*' * 5} {initial} {'*' * 5}")
print_something(x, "input_x")
print_something(y, "target")
return x, y
def return_x_yy(self, x, prev_y, y, initial):
if self.verbosity > 0:
print(f"{'*' * 5} {initial} data {'*' * 5}")
print_something(x, "input_x")
print_something(prev_y, "prev_y")
print_something(y, "target")
return x, prev_y, y
[docs]class DataSet(_DataSet):
"""The purpose of DataSet is to convert unprepared/raw data into prepared data.
A prepared data consists of x,y pairs where x is inputs and y is outputs. There
are >1 examples in a DataSet. Both inputs and outputs consists of same number
of examples. An example consists of one input, output pair which can be given
to a supervised machine learning algorithm for training. For tabular data, the
number of examples does not necessarily match number of rows. The number of
examples depend upon multiple factors such as presence of intervals, how
nans are handled and the arguments related to time series data preparation
which are listed in detail in prepare_data function.
DataSet class can accept the raw, unprepared data in a variety of formats such
as .csv, .xlsx, .parquet, .mat, .n5 etc. For details see this. The DataSet
class can save the prepared data into an hdf5 file which can susequently be
used to load the data and save the time.
Methods
------------
- training_data: returns training data
- validation_data: returns validation data
- test_data: returns test data
- from_h5:
- to_disk
- KFold_splits: creates splits using `KFold` of sklearn
- LeaveOneOut_splits: creates splits using `LeaveOneOut` of sklearn
- TimeSeriesSplit_splits: creates splits using `TimeSeriesSplit` of sklearn
- total_exs
"""
[docs] def __init__(
self,
data,
input_features: Union[str, list] = None,
output_features: Union[str, list] = None,
dataset_args: dict = None,
ts_args: dict = None,
split_random: bool = False,
train_fraction: float = 0.7,
val_fraction: float = 0.2,
indices: dict = None,
intervals=None,
shuffle: bool = True,
allow_nan_labels: int = 0,
nan_filler: dict = None,
batch_size: int = 32,
drop_remainder: bool = False,
teacher_forcing: bool = False,
allow_input_nans: bool = False,
seed: int = 313,
verbosity: int = 1,
mode: str = None,
category: str = None,
save: bool = False
):
"""
Initializes the DataSet class
Parameters
----------
data :
source from which to make the data. It can be one of the following:
- pandas dataframe: each columns is a feature and each row is an example
- numpy array
- xarray dataset: it can be xarray dataset
- path like: if the path is the path of a file, then this file can
be a csv/xlsx/nc/npz/mat/parquet/feather file. The .nc file
will be read using xarray to load datasets. If the path refers
to a directory, it is supposed that each file in the directory refers to one example.
- ai4water dataset : name of any of dataset name from ai4water.datasets
- name of .h5 file
input_features : Union[list, dict, str, None]
features to use as input. If `data` is pandas dataframe
then this is list of column names from `data` to be used as input.
output_features : Union[list, dict, str, None]
features to use as output. When `data` is dataframe
then it is list of column names from `data` to be used as output.
If `data` is `dict`, then it must be consistent with `data`.
Default is None,which means the last column of data will be
used as output. In case of multi-class classification, the output
column is not supposed to be one-hot-encoded rather in the form
of [0,1,2,0,1,2,1,2,0] for 3 classes. One-hot-encoding is done
inside the model.
dataset_args : dict
additional arguments for AI4Water's [datasets][ai4water.datasets]
ts_args : dict, optional
This argument should only be used if the data is time series data.
It must be a dictionary which is then passed to :py:func:`ai4water.utils.prepare_data`
for data preparation. Possible keys in dictionay are:
- lookback
- forecast_len
- forecast_step
- input_steps
split_random : bool, optional
whether to split the data into training and test randomly or not.
train_fraction : float
Fraction of the complete data to be used for training
purpose. Must be greater than 0.0.
val_fraction : float
The fraction of the training data to be used for validation.
Set to 0.0 if no validation data is to be used.
indices : dict, optional
A dictionary with two possible keys, 'training', 'validation'.
It determines the indices to be used to select training, validation
and test data. If indices are given for training, then train_fraction
must not be given. If indices are given for validation, then indices
for training must also be given and val_fraction must not be given.
Therefore, the possible keys in indices dictionary are follwoing
- ``training``
- ``training`` and ``validation``
intervals :
tuple of tuples where each tuple consits of two integers, marking
the start and end of interval. An interval here means indices
from the data. Only rows within those indices will be used when preparing
data/batches for NN. This is handy when our input data
contains chunks of missing values or when we don't want to consider several
rows in input data during data_preparation.
For further usage see `examples/using_intervals`
shuffle : bool
whether to shuffle the samples or not
allow_nan_labels : bool
whether to allow examples with nan labels or not.
if it is > 0, and if target values contain Nans, those examples
will not be ignored and will be used as it is.
In such a case a customized training and evaluation
step is performed where the loss is not calculated for predictions
corresponding to nan observations. Thus this option can be useful
when we are predicting more than 1 target and some of the examples
have some of their labels missing. In such a scenario, if we set this
option to >0, we don't need to ignore those samples at all during data
preparation. This option should be set to > 0 only when using tensorflow
for deep learning models. if == 1, then if an example has label [nan, 1]
it will not be removed while the example with label [nan, nan]
will be ignored/removed. If ==2, both examples (mentioned before) will be
considered/will not be removed. This means for multi-outputs, we can end
up having examples whose all labels are nans. if the number of outputs
are just one. Then this must be set to 2 in order to use samples with nan labels.
nan_filler : dict
This argument determines the imputation technique used to fill the nans in
the data. The imputation is actually performed by :py:class:`ai4water.preprocessing.Imputation`
class. Therefore this argument determines the interaction with `Imputation` class.
The default value is None, which will raise error if missing/nan values
are encountered in the input data. The user can however specify a
dictionary whose one key must be `method`. The value of 'method'
key can be `fillna` or `interpolate`. For example, to do forward
filling, the user can do as following
>>> {'method': 'fillna', 'imputer_args': {'method': 'ffill'}}
For details about fillna keyword options see fillna_
For `interpolate`, the user can specify the type of interpolation
for example
>>> {'method': 'interpolate', 'imputer_args': {'method': 'spline', 'order': 2}}
will perform spline interpolation with 2nd order.
For other possible options/keyword arguments for interpolate_
[see]()
The filling or interpolation is done columnwise, however, the user
can specify how to do for each column by providing the above mentioned
arguments as dictionary or list. The sklearn based imputation methods
can also be used in a similar fashion. For KNN
>>> {'method': 'KNNImputer', 'imputer_args': {'n_neighbors': 3}}
or for iterative imputation
>>> {'method': 'IterativeImputer', 'imputer_args': {'n_nearest_features': 2}}
To pass additional arguments one can make use of `imputer_args`
keyword argument
>>> {'method': 'KNNImputer', 'features': ['b'], 'imputer_args': {'n_neighbors': 4}},
For more on sklearn based imputation methods see this blog_
batch_size : int
size of one batch. Only relevent if `drop_remainder` is True.
drop_remainder : bool
whether to drop the remainder if len(data) % batch_size != 0 or not?
teacher_forcing : bool
whether to return previous output/target/ground
truth or not. This is useful when the user wants to feed output
at t-1 as input at timestep t. For details about this technique
see this article_
allow_input_nans : bool, optional
If False, the examples containing nans in inputs will be removed.
Setting this to True will result in feeding nan containing data
to your algorithm unless nans are filled with `nan_filler`.
seed : int
random seed for reproducibility
verbosity : int
mode : str
either ``regression`` or ``classification``
category : str
save : bool
whether to save the data in an h5 file or not.
Example
-------
>>> import pandas as pd
>>> import numpy as np
>>> from ai4water.preprocessing import DataSet
>>> data_ = pd.DataFrame(np.random.randint(0, 1000, (50, 2)), columns=['input', 'output'])
>>> data_set = DataSet(data=data_, ts_args={'lookback':5})
>>> x,y = data_set.training_data()
.. _fillna:
https://pandas.pydata.org/pandas-docs/version/0.22.0/generated/pandas.DataFrame.fillna.html
.. _article:
https://machinelearningmastery.com/teacher-forcing-for-recurrent-neural-networks/
.. _interpolate:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.interpolate.html
.. _blog:
https://scikit-learn.org/stable/auto_examples/impute/plot_missing_values.html#sphx-glr-auto-examples-impute-plot-missing-values-py
Note
----
The word 'index' is not allowed as column name, input_features or output_features
"""
indices = indices or {}
if indices:
assert split_random is False, "indices cannot be used with split_random"
if 'training' in indices:
assert train_fraction == 0.7, f"""
You can not set training data using both indices and train_fraction.
Use either indices or train_fraction."""
if 'validation' in indices:
assert val_fraction == 0.2, f"""
You can not set validation data using both indices and val_fraction.
Use either indices or val_fraction."""
assert 'training' in indices, f"""
when defining validation data using indices, training data must also be
defined using indices."""
assert val_fraction < 1.0, f"""
val_fraction must be less than 1.0 but it is {val_fraction}.
"""
self.dataset_args = dataset_args
self.config = {
'input_features': input_features,
'output_features': output_features
}
self.nan_filler = nan_filler
self.data = self._process_data(
data,
input_features,
output_features)
self.ts_args = ts_args
self.split_random = split_random
self.indices = indices
self.train_fraction = train_fraction
self.val_fraction = val_fraction
self.shuffle = shuffle
self.batch_size = batch_size
self.intervals = intervals
self.allow_nan_labels = allow_nan_labels
self.teacher_forcing = teacher_forcing
self.drop_remainder = drop_remainder
self.allow_input_nans = allow_input_nans
self.verbosity = verbosity
self.seed = seed
self.mode = mode
self.category = category
self.save = save
self.scalers = {}
self.indexes = {}
self.index_types = {}
self._input_features = copy(input_features)
if save and h5py:
self.to_disk()
_DataSet.__init__(self, config=self.config, path=os.getcwd())
[docs] def init_paras(self) -> dict:
"""Returns the initializing parameters of this class"""
signature = inspect.signature(self.__init__)
init_paras = {}
for para in signature.parameters.values():
init_paras[para.name] = getattr(self, para.name)
return init_paras
@property
def ts_args(self):
return self._ts_args
@ts_args.setter
def ts_args(self, _ts_args: dict = None):
default_args = {'input_steps': 1,
'lookback': 1,
'forecast_len': 1,
'forecast_step': 0,
'known_future_inputs': False
}
if _ts_args:
default_args.update(_ts_args)
self._ts_args = default_args
@property
def lookback(self):
return self.ts_args['lookback']
@property
def classes(self):
_classes = []
if self.mode == 'classification':
if self.num_outs == 1: # for binary/multiclass
array = self.data[self._output_features].values
_classes = np.unique(array[~np.isnan(array)])
else: # for one-hot encoded
_classes = self._output_features
return _classes
@property
def num_classes(self):
return len(self.classes)
@property
def is_binary(self) -> bool:
"""Returns True if the porblem is binary classification"""
_default = False
if self.mode == 'classification':
if self.num_outs == 1:
array = self.data[self._output_features].values
unique_vals = np.unique(array[~np.isnan(array)])
if len(unique_vals) == 2:
_default = True
else:
pass # todo, check when output columns are one-hot encoded
return _default
@property
def is_multiclass(self) -> bool:
"""Returns True if the porblem is multiclass classification"""
_default = False
if self.mode == 'classification':
if self.num_outs == 1:
array = self.data[self._output_features].values
unique_vals = np.unique(array[~np.isnan(array)])
if len(unique_vals) > 2:
_default = True
else:
pass # todo, check when output columns are one-hot encoded
return _default
@property
def is_multilabel(self) -> bool:
"""Returns True if the porblem is multilabel classification"""
_default = False
if self.mode == 'classification':
if self.num_outs > 1:
_default = True
return _default
@property
def _to_categorical(self):
# whether we have to convert y into one-hot encoded form
_defualt = False
if self.is_binary or self.is_multiclass:
if self.num_outs == 1:
_defualt = True
# it seems sklearn can accept one-hot-encoded targets but xgb, lgbm and catboost can't
# but since since sklearn can also accept non-one-hot-encoded targets for multiclass
# let's not one-hot-encode for all ML algos
if self.category == 'ML':
_defualt = False
return _defualt
@property
def teacher_forcing(self):
return self._teacher_forcing
@teacher_forcing.setter
def teacher_forcing(self, x):
self._teacher_forcing = x
@property
def input_features(self):
_inputs = self.config['input_features']
if _inputs is None and self.data is not None:
assert isinstance(self.data, pd.DataFrame)
_inputs = self.data.columns[0:-1].to_list()
return _inputs
@property
def output_features(self):
"""for external use"""
_outputs = self.config['output_features']
if _outputs is None and self.data is not None:
# assert isinstance(self.data, pd.DataFrame)
if self.data.ndim == 2:
_outputs = [col for col in self.data.columns if col not in self.input_features]
else:
_outputs = [] # todo
return _outputs
@property
def _output_features(self):
"""for internal use"""
_outputs = deepcopy(self.config['output_features'])
if isinstance(self.data, list):
assert isinstance(_outputs, list)
elif isinstance(self.data, dict):
assert isinstance(_outputs, dict), f"""
data is of type dict while output_features are
of type {_outputs.__class__.__name__}"""
for k in self.data.keys():
if k not in _outputs:
_outputs[k] = []
elif _outputs is None and self.data is not None:
assert isinstance(self.data, pd.DataFrame)
_outputs = [col for col in self.data.columns if col not in self.input_features]
return _outputs
@property
def num_ins(self):
return len(self.input_features)
@property
def num_outs(self):
return len(self.output_features)
@property
def batch_dim(self):
default = "3D"
if self.ts_args['lookback'] == 1:
default = "2D"
return default
def _process_data(self,
data,
input_features,
output_features
):
if isinstance(data, str):
_source = self._get_data_from_str(data, input_features, output_features)
if isinstance(_source, str) and _source.endswith('.h5'):
self._from_h5 = True
elif isinstance(data, pd.DataFrame):
_source = self._get_data_from_df(data, input_features, output_features)
elif isinstance(data, np.ndarray):
_source = self._get_data_from_ndarray(data, input_features, output_features)
elif data.__class__.__name__ == "Dataset":
_source = data
elif isinstance(data, list):
raise ValueError(f"""
data is given as a list. For such cases either use DataSetUnion
or DataSetPipeline insteadd of DataSet class""")
elif isinstance(data, dict):
raise ValueError(f"""
data is given as a dictionary. For such cases either use DataSetUnion
or DataSetPipeline insteadd of DataSet class""")
elif data is None:
return data
else:
assert data is not None
raise ValueError(f"""
unregnizable source of data of type {data.__class__.__name__} given
""")
_source = self.impute(_source)
return _source
def _get_data_from_ndarray(self, data, input_features, output_features):
if data.ndim == 2:
# if output_features is not defined, consider 1 output and name it
# as 'output'
if output_features is None:
output_features = ['outout']
self.config['output_features'] = output_features # we should put it in config as well
elif isinstance(output_features, str):
output_features = [output_features]
else:
assert isinstance(output_features, list)
if input_features is None: # define dummy names for input_features
input_features = [f'input_{i}' for i in range(data.shape[1] - len(output_features))]
self.config['input_features'] = input_features
return pd.DataFrame(data, columns=input_features + output_features)
else:
return data
def _get_data_from_df(self, data, input_features, output_features):
if input_features is None and output_features is not None:
if isinstance(output_features, str):
output_features = [output_features]
assert isinstance(output_features, list)
input_features = [col for col in data.columns if col not in output_features]
# since we have inferred the input_features, they should be put
# back into config
self.config['input_features'] = input_features
return data
def _get_data_from_str(self, data, input_features, output_features):
if isinstance(output_features, str):
output_features = [output_features]
# dir path/file path/ ai4water dataset name
if data.endswith('.h5'):
_source = data
if data.endswith('.csv'):
_source = pd.read_csv(data)
if _source.columns[0] in ['index', 'time', 'date']:
_source.index = pd.to_datetime(_source.pop('index'))
elif data.endswith('.xlsx') or data.endswith('xlx'):
_source = pd.read_excel(data)
if _source.columns[0] in ['index', 'time', 'date']:
_source.index = pd.to_datetime(_source.pop('index'))
elif data.endswith('.parquet'):
_source = pd.read_parquet(data)
elif data.endswith('.feather'):
_source = pd.read_feather(data)
if _source.columns[0] in ['index', 'time', 'date']:
_source.index = pd.to_datetime(_source.pop('index'))
# netcdf file
elif data.endswith('.nc'):
import xarray as xr
_source = xr.open_dataset(data)
_source = _source.to_dataframe()
elif data.endswith('npz'):
data = np.load(data)
assert len(data) == 1
d = []
for k, v in data.items():
d.append(v)
data: np.ndarray = d[0]
_source = pd.DataFrame(data, columns=input_features + output_features)
# matlab's mat file
elif data.endswith('.mat'):
import scipy
mat = scipy.io.loadmat(data)
data: np.ndarray = mat['data']
_source = pd.DataFrame(data, columns=input_features + output_features)
elif os.path.isfile(data):
assert os.path.exists(data)
_source = data
elif os.path.isdir(data):
assert len(os.listdir(data)) > 1
# read from directory
raise NotImplementedError
elif data in all_datasets:
_source = self._get_data_from_ai4w_datasets(data)
else:
raise ValueError(f"unregnizable source of data given {data}")
return _source
def _get_data_from_ai4w_datasets(self, data):
Dataset = getattr(datasets, data)
dataset = Dataset()
dataset_args = self.dataset_args
if dataset_args is None:
dataset_args = {}
# if self.config['input_features'] is not None:
dynamic_features = self.input_features + self.output_features
data = dataset.fetch(dynamic_features=dynamic_features,
**dataset_args)
data = data.to_dataframe(['time', 'dynamic_features']).unstack()
data.columns = [a[1] for a in data.columns.to_flat_index()]
return data
[docs] def impute(self, data):
"""Imputes the missing values in the data using `Imputation` module"""
if self.nan_filler is not None:
if isinstance(data, pd.DataFrame):
_source = self._impute(data, self.nan_filler)
else:
raise NotImplementedError
else:
_source = data
return _source
def _impute(self, data, impute_config):
if isinstance(impute_config, str):
method, impute_args = impute_config, {}
data = Imputation(data, method=method, **impute_args)()
elif isinstance(impute_config, dict):
data = Imputation(data, **impute_config)()
elif isinstance(impute_config, list):
for imp_conf in impute_config:
data = Imputation(data, **imp_conf)()
else:
raise NotImplementedError(f'{impute_config.__class__.__name__}')
return data
[docs] def get_indices(self):
"""If the data is to be divded into train/test based upon indices,
here we create train_indices and test_indices. The train_indices
contain indices for both training and validation data.
"""
tot_obs = self.total_exs(**self.ts_args)
all_indices = np.arange(tot_obs)
if len(self.indices) == 0:
if self.train_fraction < 1.0:
if self.split_random:
train_indices, test_indices = train_test_split(
all_indices,
train_size=self.train_fraction,
random_state=self.seed
)
else:
train_indices, test_indices = self._get_indices_by_seq_split(
all_indices,
self.train_fraction)
else: # no test data
train_indices, test_indices = all_indices, []
else:
_train_indices = self.indices.get('training', None)
_val_indices = self.indices.get('validation', None)
_test_indices = self.indices.get('test', None)
if _train_indices is not None:
if _val_indices is None:
# even if val_fraction is > 0.0, we will separate validation
# data from training later
_val_indices = np.array([]) # no validation set
else:
assert isinstance(np.array(_val_indices), np.ndarray)
_val_indices = np.array(_val_indices)
overlap = np.intersect1d(_train_indices, _val_indices)
assert len(overlap) == 0, f"""
Training and validation indices must be mutually exclusive.
They contain {len(overlap)} overlaping values."""
train_indices = np.sort(np.hstack([_train_indices, _val_indices]))
if _test_indices is None:
# get test_indices by subtracting train_indices from all indices
test_indices = [ind for ind in all_indices if ind not in train_indices]
# _val_indices = np.array([])
else: # todo
train_indices = []
setattr(self, 'train_indices', train_indices)
setattr(self, 'test_indices', test_indices)
return np.array(train_indices).astype("int32"), np.array(test_indices).astype("int32")
def _get_indices_by_seq_split(
self,
all_indices: Union[list, np.ndarray],
train_fraction):
""" sequential train/test split"""
train_indices = all_indices[0:int(train_fraction * len(all_indices))]
test_indices = all_indices[int(train_fraction * len(all_indices)):]
return train_indices, test_indices
def _training_data(self, key="_training", **kwargs):
"""training data including validation data"""
train_indices, test_indices = self.get_indices()
if 'validation' in self.indices:
# when validation indices are given, we first prepare
# complete data which contains training, validation and test data
# TODO this is agains function definition
indices = np.sort(np.hstack([train_indices, test_indices]))
else:
indices = train_indices
data = self.data.copy()
# numpy arrays are not indexed and is supposed that the whole array is
# use as input
if not isinstance(data, np.ndarray):
data = self.indexify(data, key)
# get x,_y, y
x, prev_y, y = self._make_data(
data,
intervals=self.intervals,
indices=indices,
**kwargs)
if not isinstance(self.data, np.ndarray):
x, self.indexes[key] = self.deindexify(x, key)
if self.mode == 'classification':
y = check_for_classification(y, self._to_categorical)
return x, prev_y, y
[docs] def training_data(self, key="train", **kwargs):
"""training data excluding validation data"""
if getattr(self, '_from_h5', False):
return load_data_from_hdf5('training_data', self.data)
x, prev_y, y = self._training_data(key=key, **kwargs)
if self.val_fraction > 0.0:
# when no output is generated, corresponding index will not be saved
idx = self.indexes.get(key, np.arange(len(x))) # index also needs to be split
x, prev_y, y, idx = self._train_val_split(x, prev_y, y, idx, 'training')
# if drop remainder, we need to
x, prev_y, y = self.check_for_batch_size(x, prev_y, y)
self.indexes[key] = idx[0:len(x)]
if self.teacher_forcing:
return self.return_x_yy(x, prev_y, y, "Training")
return self.return_xy(x, y, "Training")
[docs] def validation_data(self, key="val", **kwargs):
"""validation data"""
if getattr(self, '_from_h5', False):
return load_data_from_hdf5('validation_data', self.data)
x, prev_y, y = self._training_data(key=key, **kwargs)
if self.val_fraction > 0.0:
idx = self.indexes.get(key, np.arange(len(x)))
x, prev_y, y, idx = self._train_val_split(x, prev_y, y, idx, 'validation')
x, prev_y, y = self.check_for_batch_size(x, prev_y, y)
self.indexes[key] = idx[0:len(x)]
else:
x, prev_y, y = np.empty(0), np.empty(0), np.empty(0)
if self.teacher_forcing:
return self.return_x_yy(x, prev_y, y, "Validation")
return self.return_xy(x, y, "Validation")
def _train_val_split(self, x, prev_y, y, idx, return_type):
"""split x,y,idx,prev_y into training and validation data"""
if self.split_random:
# split x,y randomly
splitter = TrainTestSplit(test_fraction=self.val_fraction, seed=self.seed)
train_x, val_x, train_y, val_y = splitter.split_by_random(x, y)
splitter = TrainTestSplit(test_fraction=self.val_fraction, seed=self.seed)
train_idx, val_idx, train_prev_y, val_prev_y = splitter.split_by_random(
idx, prev_y)
elif 'validation' in self.indices:
# separate indices were provided for validation data
# it must be remembered that x,y now contains training+validation+test data
# but based upon indices, we will choose either training or validation data
val_indices = self.indices['validation']
_train_indices, _ = self.get_indices()
train_indices = [i for i in _train_indices if i not in val_indices]
splitter = TrainTestSplit(train_indices=train_indices, test_indices=val_indices)
train_x, val_x, train_y, val_y = splitter.split_by_indices(
x, y
)
splitter = TrainTestSplit(train_indices=train_indices, test_indices=val_indices)
train_idx, val_idx, train_prev_y, val_prev_y = splitter.split_by_indices(
idx, prev_y)
else:
# split x,y sequentially
splitter = TrainTestSplit(test_fraction=self.val_fraction)
train_x, val_x, train_y, val_y = splitter.split_by_slicing(x, y)
splitter = TrainTestSplit(test_fraction=self.val_fraction)
train_idx, val_idx, train_prev_y, val_prev_y = splitter.split_by_slicing(idx, prev_y)
if return_type == "training":
return train_x, train_prev_y, train_y, train_idx
return val_x, val_prev_y, val_y, val_idx
[docs] def test_data(self, key="test", **kwargs):
"""test data"""
if getattr(self, '_from_h5', False):
return load_data_from_hdf5('test_data', self.data)
if self.train_fraction < 1.0:
data = self.data.copy()
# numpy arrays are not indexed and is supposed that the whole array
# is use as input
if not isinstance(data, np.ndarray):
data = self.indexify(data, key)
_, test_indices = self.get_indices()
if len(test_indices) > 0: # it is possible that training and validation
# indices cover whole data
# get x,_y, y
x, prev_y, y = self._make_data(
data,
intervals=self.intervals,
indices=test_indices,
**kwargs)
x, prev_y, y = self.check_for_batch_size(x, prev_y, y)
if not isinstance(self.data, np.ndarray):
x, self.indexes[key] = self.deindexify(x, key)
if self.mode == 'classification':
y = check_for_classification(y, self._to_categorical)
else:
x, prev_y, y = np.empty(0), np.empty(0), np.empty(0)
else:
x, prev_y, y = np.empty(0), np.empty(0), np.empty(0)
if self.teacher_forcing:
return self.return_x_yy(x, prev_y, y, "Test")
return self.return_xy(x, y, "Test")
[docs] def check_for_batch_size(self, x, prev_y=None, y=None):
if self.drop_remainder:
assert isinstance(x, np.ndarray)
remainder = len(x) % self.batch_size
if remainder:
x = x[0:-remainder]
if prev_y is not None:
prev_y = prev_y[0:-remainder]
if y is not None:
y = y[0:-remainder]
return x, prev_y, y
[docs] def check_nans(self, data, input_x, input_y, label_y):
"""Checks whether anns are present or not and checks shapes of arrays
being prepared."""
if isinstance(data, pd.DataFrame):
nans = data[self.output_features].isna()
nans = nans.sum().sum()
data = data.values
else:
nans = np.isnan(data[:, -self.num_outs:])
# df[self.out_cols].isna().sum()
nans = int(nans.sum())
if nans > 0:
if self.allow_nan_labels == 2:
if self.verbosity > 0: print("""
\n{} Allowing NANs in predictions {}\n""".format(10 * '*', 10 * '*'))
elif self.allow_nan_labels == 1:
if self.verbosity > 0: print("""
\n{} Ignoring examples whose all labels are NaNs {}\n
""".format(10 * '*', 10 * '*'))
idx = ~np.array([all([np.isnan(x) for x in label_y[i]]) for i in range(len(label_y))])
input_x = input_x[idx]
input_y = input_y[idx]
label_y = label_y[idx]
if int(np.isnan(data[:, -self.num_outs:][0:self.lookback]).sum() / self.num_outs) >= self.lookback:
self.nans_removed_4m_st = -9999
else:
if self.verbosity > 0:
print('\n{} Removing Examples with nan in labels {}\n'.format(10 * '*', 10 * '*'))
if self.num_outs == 1:
# find out how many nans were present from start of data until
# lookback, these nans will be removed
self.nans_removed_4m_st = np.isnan(data[:, -self.num_outs:][0:self.lookback]).sum()
# find out such labels where 'y' has at least one nan
nan_idx = np.array([np.any(i) for i in np.isnan(label_y)])
non_nan_idx = np.invert(nan_idx)
label_y = label_y[non_nan_idx]
input_x = input_x[non_nan_idx]
input_y = input_y[non_nan_idx]
assert np.isnan(label_y).sum() < 1, """
label still contains {} nans""".format(np.isnan(label_y).sum())
assert input_x.shape[0] == input_y.shape[0] == label_y.shape[0], """
shapes are not same"""
if not self.allow_input_nans:
assert np.isnan(input_x).sum() == 0, """input still contains {} nans
""".format(np.isnan(input_x).sum())
return input_x, input_y, label_y
[docs] def indexify(self, data: pd.DataFrame, key):
data = data.copy()
dummy_index = False
# for dataframes
if isinstance(data.index, pd.DatetimeIndex):
index = list(map(int, np.array(data.index.strftime('%Y%m%d%H%M'))))
# datetime index
self.index_types[key] = 'dt'
original_index = pd.Series(index, index=index)
else:
try:
index = list(map(int, np.array(data.index)))
self.index_types[key] = 'int'
original_index = pd.Series(index, index=index)
except ValueError: # index may not be convertible to integer, it may be
# string values
dummy_index = np.arange(len(data), dtype=np.int64)
original_index = pd.Series(data.index, index=dummy_index)
index = dummy_index
self.index_types[key] = 'str'
self.indexes[key] = {'dummy': dummy_index,
'original': original_index}
# pandas will add the 'datetime' column as first column.
# This columns will only be used to keep
# track of indices of train and test data.
data.insert(0, 'index', index)
self._input_features = ['index'] + self.input_features
# setattr(self, 'input_features', ['index'] + self.input_features)
self.indexes[key] = {'index': index, 'dummy_index': dummy_index,
'original': original_index}
return data
[docs] def deindexify(self, data: np.ndarray, key):
_data, _index = self.deindexify_nparray(data, key)
if self.indexes[key].get('dummy_index', None) is not None:
_index = self.indexes[key]['original'].loc[_index].values
if self.index_types[key] == 'dt':
_index = to_datetime_index(_index)
return _data, _index
[docs] def get_batches(self, data):
if self.batch_dim == "2D":
return self.get_2d_batches(data)
else:
return self.check_nans(data, *prepare_data(data,
num_outputs=self.num_outs,
**self.ts_args))
[docs] def get_2d_batches(self, data):
# need to count num_ins based upon _input_features as it consider index
num_ins = len(self._input_features)
if not isinstance(data, np.ndarray):
if isinstance(data, pd.DataFrame):
data = data.values
else:
raise TypeError(f"unknown data type {data.__class__.__name__} for data ")
if self.num_outs > 0:
input_x = data[:, 0:num_ins]
input_y, label_y = data[:, -self.num_outs:], data[:, -self.num_outs:]
else:
dummy_input_y = np.random.random((len(data), self.num_outs))
dummy_y = np.random.random((len(data), self.num_outs))
input_x, input_y, label_y = data[:, 0:num_ins], dummy_input_y, dummy_y
assert self.lookback == 1, """
lookback should be one for MLP/Dense layer based model, but it is {}
""".format(self.lookback)
return self.check_nans(data, input_x, input_y, np.expand_dims(label_y, axis=2))
def _make_data(self, data, indices=None, intervals=None, shuffle=False):
# if indices is not None:
# indices = np.array(indices).astype("int32")
# assert isinstance(np.array(indices), np.ndarray), "indices must be array like"
if isinstance(data, pd.DataFrame):
data = data[self._input_features + self.output_features].copy()
df = data
else:
data = data.copy()
df = data
if intervals is None:
x, prev_y, y = self.get_batches(df)
if indices is not None:
# if indices are given then this should be done after `get_batches`
# method
x = x[indices]
prev_y = prev_y[indices]
y = y[indices]
else:
xs, prev_ys, ys = [], [], []
for _st, _en in intervals:
df1 = data[_st:_en]
if df1.shape[0] > 0:
x, prev_y, y = self.get_batches(df1.values)
xs.append(x)
prev_ys.append(prev_y)
ys.append(y)
if indices is None:
x = np.vstack(xs)
prev_y = np.vstack(prev_ys)
y = np.vstack(ys)
else:
x = np.vstack(xs)[indices]
prev_y = np.vstack(prev_ys)[indices]
y = np.vstack(ys)[indices]
if shuffle:
raise NotImplementedError
if isinstance(data, pd.DataFrame) and 'index' in data:
data.pop('index')
if self.ts_args['forecast_len'] == 1 and len(self.output_features) > 0:
y = y.reshape(-1, len(self.output_features))
return x, prev_y, y
[docs] def deindexify_nparray(self, data, key):
if data.ndim == 3:
_data, index = data[..., 1:].astype(np.float32), data[:, -1, 0]
elif data.ndim == 2:
_data, index = data[..., 1:].astype(np.float32), data[:, 0]
elif data.ndim == 4:
_data, index = data[..., 1:].astype(np.float32), data[:, -1, -1, 0]
elif data.ndim == 5:
_data, index = data[..., 1:].astype(np.float32), data[:, -1, -1, -1, 0]
else:
raise NotImplementedError
if self.index_types[key] != 'str':
index = np.array(index, dtype=np.int64)
return _data, index
[docs] def total_exs(self,
lookback,
forecast_step=0, forecast_len=1,
**ts_args
):
intervals = self.intervals
input_steps = self.ts_args['input_steps']
data = consider_intervals(self.data, intervals)
num_outs = len(self.output_features) if self.output_features is not None else None
max_tot_obs = 0
if not self.allow_nan_labels and intervals is None:
_data = data[self.input_features + self.output_features] if isinstance(data, pd.DataFrame) else data
x, _, _ = prepare_data(_data,
lookback, num_outputs=num_outs,
forecast_step=forecast_step,
forecast_len=forecast_len, mask=np.nan, **ts_args)
max_tot_obs = len(x)
# we need to ignore some values at the start
more = (lookback * input_steps) - 1
if isinstance(data, np.ndarray):
return len(data) - more
# todo, why not when allow_nan_labels>0?
if forecast_step > 0:
more += forecast_step
if forecast_len > 1:
more += forecast_len
if intervals is None: intervals = [()]
more *= len(intervals)
if self.allow_nan_labels == 2:
tot_obs = data.shape[0] - more
elif self.allow_nan_labels == 1:
label_y = data[self.output_features].values
idx = ~np.array([all([np.isnan(x) for x in label_y[i]]) for i in range(len(label_y))])
tot_obs = np.sum(idx) - more
else:
if num_outs == 1:
tot_obs = data.shape[0] - int(data[self.output_features].isna().sum()) - more
tot_obs = max(tot_obs, max_tot_obs)
else:
# count by droping all the rows when nans occur in output features
tot_obs = len(data.dropna(subset=self.output_features))
tot_obs -= more
return tot_obs
[docs] def KFold_splits(self, n_splits=5):
"""returns an iterator for kfold cross validation.
The iterator yields two tuples of training and test x,y pairs.
The iterator on every iteration returns following
`(train_x, train_y), (test_x, test_y)`
Note: only `training_data` and `validation_data` are used to make kfolds.
Example
---------
>>> import numpy as np
>>> import pandas as pd
>>> from ai4water.preprocessing import DataSet
>>> data = pd.DataFrame(np.random.randint(0, 10, (20, 3)), columns=['a', 'b', 'c'])
>>> data_set = DataSet(data=data)
>>> kfold_splits = data_set.KFold_splits()
>>> for (train_x, train_y), (test_x, test_y) in kfold_splits:
... print(train_x, train_y, test_x, test_y)
"""
if self.teacher_forcing:
warnings.warn("Ignoring prev_y")
x, _, y = self._training_data()
kf = KFold(n_splits=n_splits,
random_state=self.seed if self.shuffle else None,
shuffle=self.shuffle)
spliter = kf.split(x)
for tr_idx, test_idx in spliter:
yield (x[tr_idx], y[tr_idx]), (x[test_idx], y[test_idx])
[docs] def LeaveOneOut_splits(self):
"""Yields leave one out splits
The iterator on every iteration returns following
`(train_x, train_y), (test_x, test_y)`"""
if self.teacher_forcing:
warnings.warn("Ignoring prev_y")
x, _, y = self._training_data()
kf = LeaveOneOut()
for tr_idx, test_idx in kf.split(x):
yield (x[tr_idx], y[tr_idx]), (x[test_idx], y[test_idx])
[docs] def ShuffleSplit_splits(self, **kwargs):
"""Yields ShuffleSplit splits
The iterator on every iteration returns following
`(train_x, train_y), (test_x, test_y)`"""
if self.teacher_forcing:
warnings.warn("Ignoring prev_y")
x, _, y = self._training_data()
sf = ShuffleSplit(**kwargs)
for tr_idx, test_idx in sf.split(x):
yield (x[tr_idx], y[tr_idx]), (x[test_idx], y[test_idx])
[docs] def TimeSeriesSplit_splits(self, n_splits=5, **kwargs):
"""returns an iterator for TimeSeriesSplit.
The iterator on every iteration returns following
`(train_x, train_y), (test_x, test_y)`
"""
if self.teacher_forcing:
warnings.warn("Ignoring prev_y")
x, _, y = self._training_data()
tscv = TimeSeriesSplit(n_splits=n_splits, **kwargs)
for tr_idx, test_idx in tscv.split(x):
yield (x[tr_idx], y[tr_idx]), (x[test_idx], y[test_idx])
[docs] def plot_KFold_splits(self, n_splits=5, show=True, **kwargs):
"""Plots the indices of kfold splits"""
if self.teacher_forcing:
warnings.warn("Ignoring prev_y")
x, _, y = self._training_data()
kf = KFold(n_splits=n_splits,
random_state=self.seed if self.shuffle else None,
shuffle=self.shuffle)
spliter = kf.split(x)
self._plot_splits(spliter, x, title="KFoldCV", show=show, **kwargs)
return
[docs] def plot_LeaveOneOut_splits(self, show=True, **kwargs):
"""Plots the indices obtained from LeaveOneOut strategy"""
if self.teacher_forcing:
warnings.warn("Ignoring prev_y")
x, _, y = self._training_data()
spliter = LeaveOneOut().split(x)
self._plot_splits(spliter=spliter,
x=x,
title="LeaveOneOutCV",
show=show,
**kwargs)
return
[docs] def plot_TimeSeriesSplit_splits(self, n_splits=5, show=True, **kwargs):
"""Plots the indices obtained from TimeSeriesSplit strategy"""
if self.teacher_forcing:
warnings.warn("Ignoring prev_y")
x, _, y = self._training_data()
spliter = TimeSeriesSplit(n_splits=n_splits, **kwargs).split(x)
self._plot_splits(spliter=spliter,
x=x,
title="TimeSeriesCV",
show=show,
**kwargs)
return
def _plot_splits(self, spliter, x, show=True, **kwargs):
splits = list(spliter)
figsize = kwargs.get('figsize', (10, 8))
legend_fs = kwargs.get('legend_fs', 20)
legend_pos = kwargs.get('legend_pos', (1.02, 0.8))
title = kwargs.get("title", "CV")
plt.close('all')
fig = plt.figure(figsize=figsize)
ax = fig.add_subplot(111)
for ii, split in enumerate(splits):
indices = np.array([np.nan] * len(x))
indices[split[0]] = 1
indices[split[1]] = 0
ax.scatter(range(len(indices)), [ii + .5] * len(indices),
c=indices, marker='_', lw=10, cmap="coolwarm",
vmin=-.2, vmax=1.2)
yticklabels = list(range(len(splits)))
ax.set(yticks=np.arange(len(splits)) + .5, yticklabels=yticklabels)
ax.set_xlabel("Sample Index", fontsize=18)
ax.set_ylabel("CV iteration", fontsize=18)
ax.set_title(title, fontsize=20)
ax.legend([Patch(color=cmap_cv(.8)), Patch(color=cmap_cv(.02))],
['Training', 'Test'],
loc=legend_pos, fontsize=legend_fs)
if show:
plt.tight_layout()
plt.show()
return
[docs] def to_disk(self, path: str = None):
import h5py
path = path or os.getcwd()
filepath = os.path.join(path, "data.h5")
f = h5py.File(filepath, mode='w')
for k, v in self.init_paras().items():
if isinstance(v, (dict, list, tuple, float, int, str)):
f.attrs[k] = json.dumps(
v, default=jsonize).encode('utf8')
elif v is not None and k != 'data':
f.attrs[k] = v
if self.teacher_forcing:
x, prev_y, y = self.training_data()
val_x, val_prev_y, val_y = self.validation_data()
test_x, test_prev_y, test_y = self.test_data()
else:
prev_y, val_prev_y, test_prev_y = np.empty(0), np.empty(0), np.empty(0)
x, y = self.training_data()
val_x, val_y = self.validation_data()
test_x, test_y = self.test_data()
# save in disk
self._save_data_to_hdf5('training_data', x, prev_y, y, f)
self._save_data_to_hdf5('validation_data', val_x, val_prev_y, val_y, f)
self._save_data_to_hdf5('test_data', test_x, test_prev_y, test_y, f)
f.close()
return
def _save_data_to_hdf5(self, data_type, x, prev_y, y, f):
"""Saves one data_type in h5py. data_type is string indicating whether
it is training, validation or test data."""
assert x is not None
group_name = f.create_group(data_type)
container = {}
container['x'] = x
if self.teacher_forcing:
container['prev_y'] = prev_y
container['y'] = y
for name, val in container.items():
param_dset = group_name.create_dataset(name, val.shape, dtype=val.dtype)
if not val.shape:
# scalar
param_dset[()] = val
else:
param_dset[:] = val
return
[docs] @classmethod
def from_h5(cls, path):
"""Creates an instance of DataSet from .h5 file."""
import h5py
f = h5py.File(path, mode='r')
config = {}
for k, v in f.attrs.items():
if isinstance(v, str) or isinstance(v, bytes):
v = decode(v)
config[k] = v
cls._from_h5 = True
f.close()
# the data is already being loaded from h5 file so no need to save it again
# upon initialization of class
config['save'] = False
return cls(path, **config)