Source code for ai4water.preprocessing.transformations._main
from typing import Union
from sklearn.utils.validation import assert_all_finite
from ai4water.backend import np, pd, plt, stats
from ai4water.backend import easy_mpl as em
from ai4water.utils.utils import dateandtime_now, deepcopy_dict_without_clone
from ._transformations import MinMaxScaler, PowerTransformer, QuantileTransformer, StandardScaler
from ._transformations import LogScaler, Log10Scaler, Log2Scaler, TanScaler, SqrtScaler, CumsumScaler
from ._transformations import FunctionTransformer, RobustScaler, MaxAbsScaler
from ._transformations import ParetoTransformer
from ._transformations import VastTransformer
from ._transformations import MmadTransformer
from ._transformations import Center
from ._transformations import HyperbolicTangentTransformer
from ._transformations import LogisticSigmoidTransformer
from .utils import InvalidTransformation, TransformerNotFittedError, SP_METHODS
# TODO add logistic, tanh and more transformers.
# which transformation to use? Some related articles/posts
# https://scikit-learn.org/stable/modules/preprocessing.html
# http://www.faqs.org/faqs/ai-faq/neural-nets/part2/section-16.html
# https://data.library.virginia.edu/interpreting-log-transformations-in-a-linear-model/
class TransformationsContainer(object):
def __init__(self):
self.transformer_ = None
self.transforming_straight = True
self.index = None
INITIATED_TRANSFORMERS = {
'log': LogScaler(),
'log2': Log2Scaler(),
'log10': Log10Scaler(),
'sqrt': SqrtScaler()
}
class _Processor(object):
def __init__(self,
replace_zeros,
replace_zeros_with,
treat_negatives,
features=None
):
self.replace_zeros = replace_zeros
self.replace_zeros_with = replace_zeros_with
self.treat_negatives = treat_negatives
self.features = features
self.index = None
def preprocess(self, data, transforming_straight=True):
"""Makes sure that data is dataframe and optionally replaces nans"""
data = to_dataframe(data)
# save the index if not already saved so that can be used later
if self.index is None:
self.index = data.index
columns = self.features or data.columns
indices = {}
if self.replace_zeros and transforming_straight:
# instead of saving indices with column names, using column indices
# because df.iloc[row_idx, col_idx] is better than df[col_name].iloc[row_idx]
for col_idx, col in enumerate(columns):
# find index containing 0s in corrent column of dataframe
i = data.index[data[col] == 0.0]
if len(i) > 0:
indices[col_idx] = i.values
if self.replace_zeros_with in ['mean', 'max', 'min']:
replace_with = float(getattr(np, 'nan' + self.replace_zeros_with)(data[col]))
else:
replace_with = self.replace_zeros_with
data.loc[indices[col_idx], col] = get_val(data[col], replace_with)
#if self.zero_indices is None:
self.zero_indices_ = indices
indices = {}
if self.treat_negatives:
for col_idx, col in enumerate(columns):
# find index containing negatives in corrent column of dataframe
i = data.index[data[col] < 0.0]
if len(i) > 0:
indices[col_idx] = i.values
# turn -ve values into positives
data[col] = data[col].abs()
self.negative_indices_ = indices
return data
def postprocess(self, data):
"""If nans/zeros were replaced with some value, put nans/zeros back."""
data = data.copy()
if self.replace_zeros:
if hasattr(self, 'zero_indices_'):
for col, idx in self.zero_indices_.items():
data.iloc[idx, col] = 0.0
if self.treat_negatives:
if hasattr(self, 'negative_indices_'):
for col, idx in self.negative_indices_.items():
# invert the sign of those values which were originally -ve
for _idx in idx:
data.iat[_idx, col] = -data.iat[_idx, col]
return data
[docs]class Transformation(TransformationsContainer):
"""
Applies transformation to tabular data. It is also possible to apply transformation
on some selected features/columns of data. This class also performs some optional
pre-processing on data before applying transformation on it.
Any new transforming methods should define two methods one starting with
`transform_with_` and `inverse_transofrm_with_`
Currently following methods are available for transformation and inverse transformation
Transformation methods
- ``minmax``
- ``maxabs``
- ``robust``
- ``power`` same as yeo-johnson
- ``yeo-johnson`` power transformation using Yeo-Johnson method
- ``box-cox`` power transformation using box-cox method
- ``zscore`` also known as standard scalers
- ``scale`` division by standard deviation
- ``center`` by subtracting mean
- ``quantile``
- ``quantile_normal`` quantile with normal distribution as target
- ``log`` natural logrithmic
- ``log10`` log with base 10
- ``log2`` log with base 2
- ``sqrt`` square root
- ``tan`` tangent
- ``cumsum`` cummulative sum
- ``mmax`` median and median absolute deviation
- ``pareto``
- ``vast`` Variable Stability Scaling
- ``sigmoid`` logistic sigmoid
- ``tanh`` hyperbolic tangent
To transform a datafrmae using any of the above methods use
Examples:
>>> from ai4water.preprocessing import Transformation
>>> transformer = Transformation(method='zscore')
>>> transformer.fit_transform(data=[1,2,3,5])
or
>>> transformer = Transformation(method='minmax')
>>> normalized_df = transformer.fit_transform(data=pd.DataFrame([1,2,3]))
>>> transformer = Transformation(method='log', replace_zeros=True)
>>> trans_df, proc = transformer.fit_transform(data=pd.DataFrame([1,0,2,3]),
>>> return_proc=True)
>>> detransfomred_df = transformer.inverse_transform(trans_df, postprocessor=proc)
or using one liner
>>> normalized_df = Transformation(method='minmax',
... features=['a'])(data=pd.DataFrame([[1,2],[3,4], [5,6]],
... columns=['a', 'b']))
where ``method`` can be any of the above mentioned methods.
Note
------
``tan``, ``tanh``, ``sigmoid`` and ``cumsum`` do not return original data upon
inverse transformation.
.. _google:
https://developers.google.com/machine-learning/data-prep/transform/normalization
"""
available_transformers = {
"minmax": MinMaxScaler,
"zscore": StandardScaler,
"center": Center,
"scale": StandardScaler,
"robust": RobustScaler,
"maxabs": MaxAbsScaler,
"power": PowerTransformer,
"yeo-johnson": PowerTransformer,
"box-cox": PowerTransformer,
"quantile": QuantileTransformer,
"quantile_normal": QuantileTransformer,
"log": LogScaler,
"log10": Log10Scaler,
"log2": Log2Scaler,
"sqrt": SqrtScaler,
"tan": TanScaler,
"cumsum": CumsumScaler,
"vast": VastTransformer,
"pareto": ParetoTransformer,
"mmad": MmadTransformer,
"sigmoid": LogisticSigmoidTransformer,
"tanh": HyperbolicTangentTransformer,
}
[docs] def __init__(self,
method: str = 'minmax',
features: list = None,
replace_zeros: bool = False,
replace_zeros_with: Union[str, int, float] = 1,
treat_negatives: bool = False,
**kwargs
):
"""
Arguments:
method : method by which to transform and consequencly inversely
transform the data. default is 'minmax'. see `Transformations.available_transformers`
for full list.
features : string or list of strings. Only applicable if `data` is
dataframe. It defines the columns on which we want to apply transformation.
The remaining columns will remain same/unchanged.
replace_zeros : If true, then setting this argument to True will replace
the zero values in data with some fixed value `replace_zeros_with`
before transformation. The zero values will be put back at their
places after transformation so this replacement/implacement is
done only to avoid error during transformation for example during Box-Cox.
replace_zeros_with : if replace_zeros is True, then this value will be used
to replace zeros in dataframe before doing transformation. You can
define the method with which to replace nans for exaple by setting
this argument to 'mean' will replace zeros with 'mean' of the
array/column which contains zeros. Allowed string values are
'mean', 'max', 'min'. see_
treat_negatives:
If true, and if data contains negative values, then the absolute
values of these negative values will be considered for transformation.
For inverse transformation, the -ve sign is removed, to return the
original data. This option is necessary for log, sqrt and box-cox
transformations with -ve values in data.
kwargs : any arguments which are to be provided to transformer on
INTIALIZATION and not during transform or inverse transform
Example:
>>> from ai4water.preprocessing.transformations import Transformation
>>> from ai4water.datasets import busan_beach
>>> df = busan_beach()
>>> inputs = ['tide_cm', 'wat_temp_c', 'sal_psu', 'air_temp_c', 'pcp_mm', 'pcp3_mm']
>>> transformer = Transformation(method='minmax', features=['sal_psu', 'air_temp_c'])
>>> new_data = transformer.fit_transform(df[inputs])
Following shows how to apply log transformation on an array containing zeros
by making use of the argument `replace_zeros`. The zeros in the input array
will be replaced internally but will be inserted back afterwards.
>>> from ai4water.preprocessing.transformations import Transformation
>>> transformer = Transformation(method='log', replace_zeros=True)
>>> transformed_data = transformer.fit_transform([1,2,3,0.0, 5, np.nan, 7])
... [0.0, 0.6931, 1.0986, 0.0, 1.609, None, 1.9459]
>>> original_data = transformer.inverse_transform(data=transformed_data)
.. _see:
https://stats.stackexchange.com/a/222237/338323
"""
super().__init__()
if method not in self.available_transformers.keys():
raise InvalidTransformation(method, list(self.available_transformers.keys()))
self.method = method
self.replace_zeros = replace_zeros
self.replace_zeros_with = replace_zeros_with
self.treat_negatives = treat_negatives
self.features = features
self.kwargs = kwargs
self.transformed_features = None
if self.transformer_ is None: # self.transformer_ can be set during from_config
_kwargs = {}
if self.method == "scale":
_kwargs['with_mean'] = False
elif self.method == "box-cox":
_kwargs['method'] = "box-cox"
elif self.method == "quantile_normal":
_kwargs["output_distribution"] = "normal"
for k,v in self.kwargs.items():
if k in _kwargs:
_kwargs.pop(k)
transformer = self.get_transformer()(**_kwargs, **kwargs)
self.transformer_ = transformer
def __call__(self, data, what="fit_transform", return_proc=False, **kwargs):
"""
Calls the `fit_transform` and `inverse_transform` methods.
"""
if what.startswith("fit"):
self.transforming_straight = True
return self.fit_transform(data, return_proc=return_proc, **kwargs)
elif what.startswith("inv"):
self.transforming_straight = False
return self.inverse_transform(data, **kwargs)
else:
raise ValueError(f"The class Transformation can not be called with keyword argument 'what'={what}")
@property
def features(self):
return self._features
@features.setter
def features(self, x):
if x is not None:
assert len(x) == len(set(x)), f"duplicated features are not allowed. Features are: {x}"
self._features = x
@property
def transformed_features(self):
return self._transformed_features
@transformed_features.setter
def transformed_features(self, x):
self._transformed_features = x
@property
def num_features(self):
return len(self.features)
def _preprocess(self, data):
self.transforming_straight = True
proc = _Processor(self.replace_zeros,
self.replace_zeros_with,
self.treat_negatives,
features=self.features
)
data = proc.preprocess(data.copy())
if self.features is None:
self.features = list(data.columns)
setattr(self, 'initial_shape_', data.shape)
to_transform = self.get_features(data)
if self.method.lower() in ["log", "log10", "log2"]:
if (to_transform.values < 0).any():
raise InvalidValueError(self.method, "negative")
return to_transform, proc
[docs] def fit(self, data, **kwargs):
"""fits the data according the transformation methods."""
to_transform, proc = self._preprocess(data)
if self.method in ['power', 'yeo-johnson', 'box-cox']:
# a = np.array([87.52, 89.41, 89.4, 89.23, 89.92], dtype=np.float32).reshape(-1,1)
# power transformers sometimes overflow with small data which causes inf error
to_transform = to_transform.astype("float64")
return self.transformer_.fit(to_transform.values, **kwargs)
[docs] def transform(self, data, return_proc=False, **kwargs):
"""transforms the data according to fitted transformers."""
original_data = to_dataframe(data.copy())
to_transform, proc = self._preprocess(data)
if self.method in ['power', 'yeo-johnson', 'box-cox']:
# a = np.array([87.52, 89.41, 89.4, 89.23, 89.92], dtype=np.float32).reshape(-1,1)
# power transformers sometimes overflow with small data which causes inf error
to_transform = to_transform.astype("float64")
data = self.transformer_.transform(to_transform.values, **kwargs)
return self._postprocess(data, to_transform, original_data, proc, return_proc)
[docs] def fit_transform(self, data, return_proc=False, **kwargs):
"""
Transforms the data
Arguments:
data : a dataframe or numpy ndarray or array like. The transformed or inversely
transformed value will have the same type as data and will have
the same index as data (in case data is dataframe). The shape of
`data` is supposed to be (num_examples, num_features).
return_proc : whether to return the processer or not. If True, then a
tuple is returned which consists of transformed data and second is the preprocessor.
kwargs :
"""
original_data = to_dataframe(data.copy())
to_transform, proc = self._preprocess(data)
try:
data = self.transformer_.fit_transform(to_transform.values, **kwargs)
except ValueError as e:
raise ValueError(f"Transformation {self.method} of {self.features} features raised {e}")
return self._postprocess(data, to_transform, original_data, proc, return_proc)
def _postprocess(self, data, to_transform, original_data, proc, return_proc):
data = pd.DataFrame(data, columns=to_transform.columns)
data = self.maybe_insert_features(original_data, data)
data = proc.postprocess(data)
if return_proc:
return data, proc
return data
[docs] def inverse_transform(self,
data,
postprocessor:_Processor=None,
without_fit=False,
**kwargs):
"""
Inverse transforms the data.
Parameters
---------
data:
postprocessor :
without_fit : bool
kwargs : any of the folliwng keyword arguments
- data: data on which to apply inverse transformation
- key : key to fetch transformer
- transformer : transformer to use for inverse transformation. If not given, then
the available transformer is used.
"""
self.transforming_straight = False
# during transform, we convert to df even when input is list or np array
# which inserts columns/features into data.
data = to_dataframe(data)
if self.treat_negatives and hasattr(postprocessor, "negative_indices_"):
for col, idx in postprocessor.negative_indices_.items():
data.iloc[idx, col] = -data.iloc[idx, col]
if 'transformer' in kwargs:
transformer = kwargs['transformer']
elif self.transformer_ is not None:
transformer = self.transformer_
elif self.method in SP_METHODS:
transformer = INITIATED_TRANSFORMERS[self.method]
without_fit = True
else:
raise TransformerNotFittedError()
if self.treat_negatives and hasattr(self, "negative_indices_"):
for col, idx in self.negative_indices_.items():
data.iloc[idx, col] = -data.iloc[idx, col]
self.transforming_straight = False
original_data = data.copy()
to_transform = self.get_features(data)
if without_fit:
data = transformer.inverse_transform_without_fit(to_transform)
else:
data = transformer.inverse_transform(to_transform.values)
data = pd.DataFrame(data, columns=to_transform.columns)
data = self.maybe_insert_features(original_data, data)
if postprocessor is not None:
data = postprocessor.postprocess(data)
return data
[docs] def get_features(self, data) -> pd.DataFrame:
if self.features is None:
return data
else:
assert isinstance(self.features, list)
return data[self.features]
[docs] def serialize_transformer(self, transformer):
key = self.method + str(dateandtime_now())
serialized_transformer = {
"transformer": transformer,
"key": key
}
self.transformer_ = transformer
return serialized_transformer
[docs] def get_transformer_from_dict(self, **kwargs):
if 'transformer' in kwargs:
transformer = kwargs['transformer']
else:
raise TransformerNotFittedError()
return transformer
[docs] def maybe_insert_features(self, original_df, trans_df):
trans_df.index = original_df.index
num_features = len(original_df.columns)
if len(trans_df.columns) != num_features:
df = pd.DataFrame(index=original_df.index)
for col in original_df.columns: # features:
if col in trans_df.columns:
_df = trans_df[col]
else:
_df = original_df[col]
df = pd.concat([df, _df], axis=1)
else:
df = trans_df
assert df.shape == original_df.shape, f"shape changed from {original_df.shape} to {df.shape}"
return df
[docs] def config(self)->dict:
"""returns a dictionary which can be used to reconstruct `Transformation`
class using `from_config`.
Returns:
a dictionary
"""
assert self.transformer_ is not None, f"Transformation is not fitted yet"
return {
"transformer": {self.method: self.transformer_.config()},
"shape": self.initial_shape_,
"method": self.method,
"features": self.features,
"replace_zeros": self.replace_zeros,
"replace_zeros_with": self.replace_zeros_with,
"treat_negatives": self.treat_negatives,
"kwargs": self.kwargs,
}
[docs] @classmethod
def from_config(
cls,
config:dict
)-> "Transformation":
"""constructs the `Transformation` class from `config` which has
already been fitted/transformed.
Arguments:
config:
a dicionary which is the output of `config()` method.
Returns:
an instance of `Transformation` class.
"""
config = deepcopy_dict_without_clone(config)
shape = config.pop('shape')
transformer = config.pop('transformer')
assert len(transformer) == 1
transformer_name = list(transformer.keys())[0]
transformer_config = list(transformer.values())[0]
if 'kwargs' in config:
kwargs = config.pop('kwargs')
transformer = cls(**config, **kwargs)
# initiate the transformer
tr_initiated = transformer.available_transformers[transformer_name].from_config(transformer_config)
transformer.transformer_ = tr_initiated
transformer.initial_shape_ = shape
return transformer
[docs] def plot_comparison(
self,
data,
plot_type:str = "hist",
show:bool=True,
figsize:tuple = None,
**kwargs
)->plt.Figure:
"""
compares original and transformed data
Parameters
----------
data :
the data on which to apply transformation. It can list, numpy array or pandas dataframe
plot_type : str, optional (default="hist")
either ``hist``, ``probplot`` or ``line``
show : bool, optional (default=True)
whether to show the plot or not
figsize : tuple, optional (default=None)
figure size (width, height)
**kwargs :
any keyword arguments for easy_mpl.hist or easy_mpl.plot when
plot_type is "hist" or "probplot" respectively.
Returns
-------
plt.Figure
Examples
--------
>>> from ai4water.preprocessing import Transformation
>>> import numpy as np
>>> t = Transformation()
>>> t.plot_comparison(np.random.randint(1, 100, (100, 2)))
... # compare using probability plot
>>> t.plot_comparison(np.random.randint(1, 100, (100, 2)), "probplot")
... # or a simple line plot
>>> t.plot_comparison(np.random.randint(1, 100, (100, 2)), "line", figsize=(14, 6))
"""
x_ = self.fit_transform(data)
funcs = {
"hist": hist,
"probplot": probplot,
"line": plot
}
func = funcs[plot_type]
if len(x_) == x_.size:
# it is 1d
fig, axes = plt.subplots(1, 2, figsize=figsize)
func(data, ax=axes[0], ** kwargs, ax_kws=dict(title="original"), show=False)
func(x_, ax = axes[1], **kwargs, ax_kws=dict(title="Transformed"), show=False)
else:
fig, axes = plt.subplots(x_.shape[1], 2, figsize=figsize)
if isinstance(data, pd.DataFrame):
data = data.values
for idx in range(len(axes)):
title1, title2 = None, None
if idx == 0:
title1, title2 = "Original", "Transformed"
func(data[:, idx], ax=axes[idx, 0], ax_kws=dict(title=title1),
show=False, **kwargs)
func(x_.iloc[:, idx], ax=axes[idx, 1], ax_kws=dict(title=title2),
show=False, **kwargs)
plt.suptitle(self.method)
if show:
plt.show()
return fig
def hist(x, ax, **kwargs):
return em.hist(x, ax=ax, **kwargs)
def plot(x, ax, **kwargs):
# make sure that it is 1D
x = np.array(x)
assert len(x) == np.size(x)
x = x.reshape(-1,)
return em.plot(x, ax=ax, **kwargs)
def probplot(x, ax, **kwargs):
# make sure that it is 1D
x = np.array(x)
assert len(x) == np.size(x)
x = x.reshape(-1,)
(osm, osr), (slope, intercept, r) = stats.probplot(x,
dist="norm",
plot=ax)
return em.plot(osm, osr, ax=ax, **kwargs)
def get_val(df: pd.DataFrame, method):
if isinstance(method, str):
if method.lower() == "mean":
return df.mean()
elif method.lower() == "max":
return df.max()
elif method.lower() == "min":
return df.min()
elif isinstance(method, int) or isinstance(method, float):
return method
else:
raise ValueError(f"unknown method {method} to replace nan vlaues")
class InvalidValueError(Exception):
def __init__(self, method, reason):
self.method = method
self.reason = reason
def remedy(self):
if self.reason == "NaN":
return "Try setting 'replace_nans' to True"
elif self.reason == "zero":
return "Try setting 'replace_zeros' to True"
elif self.reason == "negative":
return "Try setting 'treat_negatives' to True"
def __str__(self):
return (f"""
Input data contains {self.reason} values so {self.method} transformation
can not be applied.
{self.remedy()}
""")
def to_dataframe(data)->pd.DataFrame:
if isinstance(data, pd.DataFrame):
data = data
else:
data = np.array(data)
if data.ndim == 1:
data = data.reshape(-1, 1)
assert isinstance(data, np.ndarray)
data = pd.DataFrame(data, #columns=['data' + str(i) for i in range(data.shape[1])]
)
return data