import gc
import json
import math
import warnings
from typing import Union, Tuple, List, Callable, Optional
from SeqMetrics import RegressionMetrics, ClassificationMetrics
from ai4water.backend import tf, os, np, pd, plt, easy_mpl, sklearn
from ai4water.backend import xgboost, catboost, lightgbm
from ai4water.hyperopt import HyperOpt
from ai4water.preprocessing import DataSet
from ai4water.utils.utils import make_model
from ai4water.utils.utils import TrainTestSplit
from ai4water.utils.utils import jsonize, ERROR_LABELS
from ai4water.utils.utils import AttribtueSetter
from ai4water.postprocessing import ProcessPredictions
from ai4water.utils.visualizations import edf_plot
from ai4water.utils.utils import create_subplots
from ai4water.utils.utils import find_best_weight, dateandtime_now, dict_to_file
from ai4water.functional import Model as FModel
from ai4water._main import BaseModel
plot = easy_mpl.plot
bar_chart = easy_mpl.bar_chart
taylor_plot = easy_mpl.taylor_plot
dumbbell_plot = easy_mpl.dumbbell_plot
reg_plot = easy_mpl.regplot
if tf is not None:
if 230 <= int(''.join(tf.__version__.split('.')[0:2]).ljust(3, '0')) < 250:
from ai4water.functional import Model
print(f"""
Switching to functional API due to tensorflow version {tf.__version__}
for experiments""")
else:
from ai4water import Model
else:
from ai4water import Model
SEP = os.sep
# todo plots comparing different models in following youtube videos at 6:30 and 8:00 minutes.
# https://www.youtube.com/watch?v=QrJlj0VCHys
# compare models using statistical tests wuch as Giacomini-White test or Diebold-Mariano test
# paired ttest 5x2cv
# in order to unify the use of metrics
Metrics = {
'regression': lambda t, p, multiclass=False, **kwargs: RegressionMetrics(
t, p, **kwargs),
'classification': lambda t, p, multiclass=False, **kwargs: ClassificationMetrics(
t, p, multiclass=multiclass, **kwargs)
}
Monitor = {
'regression': ['r2', 'corr_coeff', 'mse', 'rmse', 'r2_score',
'nse', 'kge', 'mape', 'pbias', 'bias', 'mae', 'nrmse',
'mase'],
'classification': ['accuracy', 'precision', 'recall', 'f1_score']
}
reg_dts = ["ExtraTreeRegressor","DecisionTreeRegressor",
"ExtraTreesRegressor", "RandomForestRegressor",
"AdaBoostRegressor", "BaggingRegressor",
"HistGradientBoostingRegressor", "GradientBoostingRegressor"]
cls_dts = ["DecisionTreeClassifier", "ExtraTreeClassifier",
"ExtraTreesClassifier", "AdaBoostClassifier","RandomForestClassifier", "BaggingClassifier"
"GradientBoostingClassifier", "HistGradientBoostingClassifier"]
if xgboost is not None:
reg_dts += ["XGBRegressor"]
cls_dts += ["XGBClassifier"]
if catboost is not None:
reg_dts += ["CatBoostRegressor"]
cls_dts += ["CatBoostClassifier"]
if lightgbm is not None:
reg_dts += ["LGBMRegressor"]
cls_dts += ["LGBMClassifier"]
DTs = {"regression":
reg_dts,
"classification":
cls_dts
}
LMs = {
"regression":
["LinearRegression", "Ridge", "RidgeCV", "SGDRegressor",
"ElasticNetCV", "ElasticNet",
"Lasso", "LassoCV", "Lars", "LarsCV", "LassoLars", "LassoLarsCV", "LassoLarsIC"],
"classification":
["LogisticRegression", "LogisticRegressionCV", "PassiveAggressiveClassifier", "Perceptron",
"RidgeClassifier", "RidgeClassifierCV", "SGDClassifier", "SGDClassifierCV"]
}
[docs]class Experiments(object):
"""
Base class for all the experiments.
All the experiments must be subclasses of this class.
The core idea of ``Experiments`` is based upon ``model``. An experiment
consists of one or more models. The models differ from each other in their
structure/idea/concept/configuration. When :py:meth:`ai4water.experiments.Experiments.fit`
is called, each ``model`` is built and trained. The user can customize, building
and training process by subclassing this class and customizing
:py:meth:`ai4water.experiments.Experiments._build` and
:py:meth:`ai4water.experiments.Experiments._fit` methods.
Attributes
------------
- metrics
- exp_path
- model_
- models
Methods
--------
- fit
- taylor_plot
- loss_comparison
- plot_convergence
- from_config
- compare_errors
- plot_improvement
- compare_convergence
- plot_cv_scores
- fit_with_tpot
"""
[docs] def __init__(
self,
cases: dict = None,
exp_name: str = None,
num_samples: int = 5,
verbosity: int = 1,
monitor: Union[str, list, Callable] = None,
show: bool = True,
save: bool = True,
**model_kws,
):
"""
Arguments
---------
cases :
python dictionary defining different cases/scenarios. See TransformationExperiments
for use case.
exp_name :
name of experiment, used to define path in which results are saved
num_samples :
only relevent when you wan to optimize hyperparameters of models
using ``grid`` method
verbosity : bool, optional
determines the amount of information
monitor : str, list, optional
list of performance metrics to monitor. It can be any performance
metric SeqMetrics_ library.
By default ``r2``, ``corr_coeff``, ``mse``, ``rmse``, ``r2_score``,
``nse``, ``kge``, ``mape``, ``pbias``, ``bias``, ``mae``, ``nrmse``
``mase`` are considered for regression and ``accuracy``, ``precision``
``recall`` are considered for classification. The user can also put a
custom metric to monitor. In such a case we it should be callable which
accepts two input arguments. The first one is array of true and second is
array of predicted values.
>>> def f1_score(t,p)->float:
>>> return ClassificationMetrics(t, p).f1_score(average="macro")
>>> monitor = [f1_score, "accuracy"]
Here ``f1_score`` is a function which accepts two arays.
**model_kws :
keyword arguments which are to be passed to `Model`
and are not optimized.
.. _SeqMetrics:
https://seqmetrics.readthedocs.io/en/latest/index.html
"""
self.opt_results = None
self.optimizer = None
self.exp_name = 'Experiments_' + str(dateandtime_now()) if exp_name is None else exp_name
self.num_samples = num_samples
self.verbosity = verbosity
self.show = show
self.save = save
self.models = [method for method in dir(self) if method.startswith('model_')]
if cases is None:
cases = {}
self.cases = {'model_' + key if not key.startswith('model_') else key: val for key, val in cases.items()}
self.models = self.models + list(self.cases.keys())
self.exp_path = os.path.join(os.getcwd(), "results", self.exp_name)
if not os.path.exists(self.exp_path):
os.makedirs(self.exp_path)
self.eval_models = {}
self.optimized_models = {}
if monitor is None:
self.monitor = Monitor[self.mode]
else:
if not isinstance(monitor, list):
monitor = [monitor]
self.monitor = monitor
self.model_kws = model_kws
# _run_type is actually set during call to .fit
self._run_type = None
@property
def category(self)->str:
raise NotImplementedError
@property
def plots_(self)->list:
if self.mode == "regression":
return ['regression', 'prediction', "residual", "edf"]
return []
def metric_kws(self, metric_name:str=None)->dict:
return {}
def _pre_build_hook(self, **suggested_paras):
"""Anything that needs to be performed before building the model."""
return suggested_paras
def config(self)->dict:
_config = {
"models":self.models,
"exp_path": self.exp_path,
"exp_name": self.exp_name,
"cases": self.cases,
"model_kws": jsonize(self.model_kws),
"eval_models": self.eval_models,
"optimized_models": self.optimized_models,
}
for attr in ['is_multiclass_', 'is_binary_', 'is_multilabel_', 'considered_models_']:
if hasattr(self, attr):
_config[attr] = getattr(self, attr)
return _config
def save_config(self):
dict_to_file(self.exp_path, config=self.config())
return
def build_from_config(self, config_path:str)->BaseModel:
assert os.path.exists(config_path), f"{config_path} does not exist"
if self.category == "DL":
model = FModel
else:
model = Model
model = model.from_config_file(config_path=config_path)
assert isinstance(model, BaseModel)
setattr(self, 'model_', model)
return model
[docs] def update_model_weight(
self,
model:Model,
config_path:str
):
"""updates the weight of model.
If no saved weight is found, a warning is raised.
"""
best_weights = find_best_weight(os.path.join(config_path, "weights"))
# Sometimes no weight is saved when the prediction is None,
# todo
# should we raise error in such case? What are other cases when best_weights can be None
if best_weights is None:
warnings.warn(f"Can't find weight for {model} from \n {config_path}")
return
weight_file = os.path.join(model.w_path, best_weights)
model.update_weights(weight_file=weight_file)
if self.verbosity>1:
print("{} Successfully loaded weights from {} file {}".format('*' * 10, weight_file, '*' * 10))
return
@property
def tpot_estimator(self):
raise NotImplementedError
@property
def mode(self):
raise NotImplementedError
@property
def num_samples(self):
return self._num_samples
@num_samples.setter
def num_samples(self, x):
self._num_samples = x
def _reset(self):
self.cv_scores_ = {}
self.metrics = {}
self.features = {}
self.iter_metrics = {}
self.considered_models_ = []
return
def _named_x0(self)->dict:
x0 = getattr(self, 'x0', None)
param_space = getattr(self, 'param_space', None)
if param_space:
names = [s.name for s in param_space]
if x0:
return {k: v for k, v in zip(names, x0)}
return {}
def _get_config(self, model_type, model_name, **suggested_paras):
# the config must contain the suggested parameters by the hpo algorithm
if model_type in self.cases:
config = self.cases[model_type]
config.update(suggested_paras)
elif model_name in self.cases:
config = self.cases[model_name]
config.update(suggested_paras)
elif hasattr(self, model_type):
config = getattr(self, model_type)(**suggested_paras)
else:
raise TypeError
return config
def _dry_run_a_model(
self,
model_type,
model_name,
cross_validate,
train_x, train_y,
val_x, val_y):
"""runs the `.fit` of allt he models being considered once.
Also populates following attributes
- eval_models
- cv_scores_
- metrics
- features
"""
if self.verbosity >= 0: print(f"running {model_name} model")
config = self._get_config(model_type, model_name, **self._named_x0())
model = self._build_fit(
train_x, train_y,
title=f"{self.exp_name}{SEP}{model_name}",
validation_data=(val_x, val_y),
cross_validate=cross_validate,
**config)
train_results = self._predict(model=model, x=train_x, y=train_y)
self._populate_results(model_name, train_results)
if val_x is not None and (hasattr(val_x, '__len__') and len(val_x)>0):
val_results = self._predict(model=model, x=val_x, y=val_y)
self._populate_results(model_name, train_results=train_results, val_results=val_results)
if cross_validate:
cv_scoring = model.val_metric
self.cv_scores_[model_type] = getattr(model, f'cross_val_scores')
setattr(self, '_cv_scoring', cv_scoring)
self.eval_models[model_type] = self.model_.path
return
def _optimize_a_model(
self,
model_type,
model_name,opt_method,
num_iterations,
cross_validate,
post_optimize,
train_x, train_y,
val_x, val_y,
**hpo_kws,
):
def objective_fn(**suggested_paras) -> float:
config = self._get_config(model_type, model_name, **suggested_paras)
return self._build_fit_eval(
train_x=train_x,
train_y=train_y,
validation_data=(val_x, val_y),
cross_validate=cross_validate,
title=f"{self.exp_name}{SEP}{model_name}",
**config)
opt_dir = os.path.join(os.getcwd(),
f"results{SEP}{self.exp_name}{SEP}{model_name}")
if self.verbosity > 0:
print(f"optimizing {model_name} using {opt_method} method")
self.optimizer = HyperOpt(
opt_method,
objective_fn=objective_fn,
param_space=self.param_space,
opt_path=opt_dir,
num_iterations=num_iterations, # number of iterations
x0=self.x0,
verbosity=self.verbosity,
**hpo_kws
)
self.opt_results = self.optimizer.fit()
self.optimized_models[model_type] = self.optimizer.opt_path
if cross_validate:
# if we do train_best, self.model_ will change and this
cv_scoring = self.model_.val_metric
self.cv_scores_[model_type] = getattr(self.model_, f'cross_val_scores')
setattr(self, '_cv_scoring', cv_scoring)
x, y = _combine_training_validation_data(train_x, train_y, (val_x, val_y))
if post_optimize == 'eval_best':
train_results = self.eval_best(x, y, model_name, opt_dir)
else:
train_results = self.train_best(x, y, model_name)
self._populate_results(model_type, train_results)
if not hasattr(self, 'model_'): # todo asking user to define this parameter is not good
raise ValueError(f'The `build` method must set a class level attribute named `model_`.')
self.eval_models[model_type] = self.model_.path
self.iter_metrics[model_type] = self.model_iter_metric
return
[docs] def fit(
self,
x=None,
y=None,
data=None,
validation_data: Optional[tuple] = None,
run_type: str = "dry_run",
opt_method: str = "bayes",
num_iterations: int = 12,
include: Union[None, list, str] = None,
exclude: Union[None, list, str] = '',
cross_validate: bool = False,
post_optimize: str = 'eval_best',
**hpo_kws
):
"""
Runs the fit loop for all the ``models`` of experiment. The user can
however, specify the models by making use of ``include`` and ``exclude``
keywords.
The data should be defined according to following four rules
either
- only x,y should be given (val will be taken from it according to splitting schemes)
- or x,y and validation_data should be given
- or only data should be given (train and validation data will be
taken accoring to splitting schemes)
Parameters
----------
x :
input data. When ``run_type`` is ``dry_run``, then the each model is trained
on this data. If ``run_type`` is ``optimize``, validation_data is not given,
then x,y pairs of validation data are extracted from this data based
upon splitting scheme i.e. ``val_fraction`` argument.
y :
label/true/observed data
data :
Raw unprepared data from which x,y pairs for training and validation
will be extracted.
this will be passed to :py:meth:`ai4water.Model.fit`.
This is is only required if ``x`` and ``y`` are not given
validation_data :
a tuple which consists of x,y pairs for validation data. This can only
be given if ``x`` and ``y`` are given and ``data`` is not given.
run_type : str, optional (default="dry_run")
One of ``dry_run`` or ``optimize``. If ``dry_run``, then all
the `models` will be trained only once. if ``optimize``, then
hyperparameters of all the models will be optimized.
opt_method : str, optional (default="bayes")
which optimization method to use. options are ``bayes``,
``random``, ``grid``. Only valid if ``run_type`` is ``optimize``
num_iterations : int, optional
number of iterations for optimization. Only valid
if ``run_type`` is ``optimize``.
include : list/str optional (default="DTs")
name of models to included. If None, all the models found
will be trained and or optimized. Default is "DTs", which
means all decision tree based models will be used.
exclude :
name of ``models`` to be excluded
cross_validate : bool, optional (default=False)
whether to cross validate the model or not. This
depends upon `cross_validator` agrument to the `Model`.
post_optimize : str, optional
one of ``eval_best`` or ``train_best``. If eval_best,
the weights from the best models will be uploaded again and the model
will be evaluated on train, test and all the data. If ``train_best``,
then a new model will be built and trained using the parameters of
the best model.
**hpo_kws :
keyword arguments for :py:class:`ai4water.hyperopt.HyperOpt` class.
Examples
---------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> exp = MLRegressionExperiments()
>>> exp.fit(data=busan_beach())
If you want to compare only RandomForest, XGBRegressor, CatBoostRegressor
and LGBMRegressor, use the ``include`` keyword
>>> exp.fit(data=busan_beach(), include=['RandomForestRegressor', 'XGBRegressor',
>>> 'CatBoostRegressor', 'LGBMRegressor'])
Similarly, if you want to exclude certain models from comparison, you can
use ``exclude`` keyword
>>> exp.fit(data=busan_beach(), exclude=["SGDRegressor"])
if you want to perform cross validation for each model, we must give
the ``cross_validator`` argument which will be passed to ai4water Model
>>> exp = MLRegressionExperiments(cross_validator={"KFold": {"n_splits": 10}})
>>> exp.fit(data=busan_beach(), cross_validate=True)
Setting ``cross_validate`` to True will populate `cv_scores_` dictionary
which can be accessed as ``exp.cv_scores_``
if you want to optimize the hyperparameters of each model,
>>> exp.fit(data=busan_beach(), run_type="optimize", num_iterations=20)
"""
train_x, train_y, val_x, val_y, _, _ = self.verify_data(
x, y, data, validation_data)
AttribtueSetter(self, train_y)
del x, y, data, validation_data
gc.collect()
assert run_type in ['optimize', 'dry_run'], f"run_type mus"
self._run_type = run_type
assert post_optimize in ['eval_best', 'train_best'], f"""
post_optimize must be either 'eval_best' or 'train_best' but it is {post_optimize}"""
if exclude == '':
exclude = []
if hpo_kws is None:
hpo_kws = {}
models_to_consider = self._check_include_arg(include)
if exclude is None:
exclude = []
elif isinstance(exclude, str):
exclude = [exclude]
consider_exclude(exclude, self.models, models_to_consider)
self._reset()
setattr(self, 'considered_models_', models_to_consider)
for model_type in models_to_consider:
model_name = model_type.split('model_')[1]
self.model_iter_metric = {}
self.iter_ = 0
# there may be attributes int the model, which needs to be loaded so run the method first.
# such as param_space etc.
if hasattr(self, model_type):
getattr(self, model_type)()
if run_type == 'dry_run':
self._dry_run_a_model(
model_type,
model_name,
cross_validate,
train_x, train_y, val_x, val_y)
else:
self._optimize_a_model(
model_type,
model_name,
opt_method,
num_iterations,
cross_validate,
post_optimize,
train_x, train_y,
val_x, val_y,
**hpo_kws
)
self.save_config()
save_json_file(os.path.join(self.exp_path, 'features.json'), self.features)
save_json_file(os.path.join(self.exp_path, 'metrics.json'), self.metrics)
return
[docs] def eval_best(
self,
x,
y,
model_type:str,
opt_dir:str,
):
"""Evaluate the best models."""
folders = [path for path in os.listdir(opt_dir) if os.path.isdir(os.path.join(opt_dir, path)) and path.startswith('1_')]
if len(folders) < 1:
return self.train_best(x, y, model_type)
assert len(folders) == 1, f"{folders}"
for mod_path in folders:
config_path = os.path.join(opt_dir, mod_path, "config.json")
model = self.build_from_config(config_path)
self.update_model_weight(model, os.path.join(opt_dir, mod_path))
results = self._predict(model, x=x, y=y)
return results
[docs] def train_best(
self,
x,
y,
model_type,
):
"""Finds the best model, builts it, fits it and makes predictions from it."""
best_paras = self.optimizer.best_paras()
if best_paras.get('lookback', 1) > 1:
_model = 'layers'
else:
_model = model_type
title = f"{self.exp_name}{SEP}{model_type}{SEP}best"
model = self._build_fit(x, y,
view=False,
title=title,
cross_validate=False,
model={_model: self.optimizer.best_paras()},
)
results = self._predict(model, x=x, y=y)
return results
def _populate_results(
self,
model_type: str,
train_results: Tuple[np.ndarray, np.ndarray],
val_results: Tuple[np.ndarray, np.ndarray] = None,
test_results: Tuple[np.ndarray, np.ndarray] = None,
):
"""populates self.metrics and self.features dictionaries"""
if not model_type.startswith('model_'): # internally we always use model_ at the start.
model_type = f'model_{model_type}'
metrics = dict()
features = dict()
# save performance metrics of train and test
if train_results is not None:
metrics['train'] = self._get_metrics(*train_results)
features['train'] = {
'true': {'std': np.std(train_results[0])},
'simulation': {'std': np.std(train_results[1])}
}
if val_results is not None:
metrics['val'] = self._get_metrics(*val_results)
features['val'] = {
'true': {'std': np.std(val_results[0])},
'simulation': {'std': np.std(val_results[1])}
}
if test_results is not None:
self.metrics[model_type]['test'] = self._get_metrics(*test_results)
self.features[model_type]['test'] = {
'true': {'std': np.std(test_results[0])},
'simulation': {'std': np.std(test_results[1])}
}
if metrics:
self.metrics[model_type] = metrics
self.features[model_type] = features
return
def _get_metrics(self, true:np.ndarray, predicted:np.ndarray)->dict:
"""get the performance metrics being monitored given true and predicted data"""
metrics_inst = Metrics[self.mode](true, predicted,
replace_nan=True,
replace_inf=True,
multiclass=self.is_multiclass_)
metrics = {}
for metric in self.monitor:
if isinstance(metric, str):
metrics[metric] = getattr(metrics_inst, metric)(**self.metric_kws(metric))
elif callable(metric):
# metric is a callable
metrics[metric.__name__] = metric(true, predicted)
else:
raise ValueError(f"invalid metric f{metric}")
return metrics
[docs] def taylor_plot(
self,
x=None,
y=None,
data=None,
include: Union[None, list] = None,
exclude: Union[None, list] = None,
figsize: tuple = (5, 8),
**kwargs
) -> plt.Figure:
"""
Compares the models using taylor_plot_.
Parameters
----------
x :
input data, if not given, then ``data`` must be given.
y :
target data
data :
raw unprocessed data from which x,y pairs can be drawn. This data
will be passed to DataSet class and :py:meth:`ai4water.preprocessing.DataSet.test_data`
method will be used to draw x,y pairs.
include : str, list, optional
if not None, must be a list of models which will be included.
None will result in plotting all the models.
exclude : str, list, optional
if not None, must be a list of models which will excluded.
None will result in no exclusion
figsize : tuple, optional
figure size as (width,height)
**kwargs :
all the keyword arguments for taylor_plot_ function.
Returns
-------
plt.Figure
Example
-------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> data = busan_beach()
>>> inputs = list(data.columns)[0:-1]
>>> outputs = list(data.columns)[-1]
>>> experiment = MLRegressionExperiments(input_features=inputs, output_features=outputs)
>>> experiment.fit(data=data)
>>> experiment.taylor_plot(data=data)
.. _taylor_plot:
https://easy-mpl.readthedocs.io/en/latest/plots.html#easy_mpl.taylor_plot
"""
_, _, _, _, x, y = self.verify_data(data=data, test_data=(x, y))
self._build_predict_from_configs(x, y)
metrics = self.metrics.copy()
include = self._check_include_arg(include)
if exclude is not None:
consider_exclude(exclude, self.models, metrics)
if 'name' in kwargs:
fname = kwargs.pop('name')
else:
fname = 'taylor'
fname = os.path.join(os.getcwd(), f'results{SEP}{self.exp_name}{SEP}{fname}.png')
train_std = [_model['train']['true']['std'] for _model in self.features.values()]
train_std = list(set(train_std))[0]
if 'test' in list(self.features.values())[0]:
test_stds = [_model['test']['true']['std'] for _model in self.features.values()]
test_data_type = "test"
else:
test_stds = [_model['val']['true']['std'] for _model in self.features.values()]
test_data_type = "val"
# if any value in test_stds is nan, set(test_stds)[0] will be nan
if np.isnan(list(set(test_stds)))[0]:
test_std = list(set(test_stds))[1]
else:
test_std = list(set(test_stds))[0]
assert not np.isnan(test_std)
observations = {'train': {'std': train_std},
test_data_type: {'std': test_std}}
simulations = {'train': None, test_data_type: None}
for scen in ['train', test_data_type]:
scen_stats = {}
for model, _metrics in metrics.items():
model_stats = {'std': self.features[model][scen]['simulation']['std'],
'corr_coeff': _metrics[scen]['corr_coeff'],
'pbias': _metrics[scen]['pbias']
}
if model in include:
key = shred_model_name(model)
scen_stats[key] = model_stats
simulations[scen] = scen_stats
ax = taylor_plot(
observations=observations,
simulations=simulations,
figsize=figsize,
show=False,
**kwargs
)
if self.save:
plt.savefig(fname, dpi=600, bbox_inches="tight")
if self.show:
plt.show()
return ax
def _consider_include(self, include: Union[str, list], to_filter:dict):
filtered = {}
include = self._check_include_arg(include)
for m in include:
if m in to_filter:
filtered[m] = to_filter[m]
return filtered
def _check_include_arg(
self,
include:Union[str, List[str]],
default=None,
)->list:
"""
if include is None, then self.models is returned.
"""
if default is None:
default = self.models
if isinstance(include, str):
if include == "DTs":
include = DTs[self.mode]
elif include == "LMs":
include = LMs[self.mode]
else:
include = [include]
if include is None:
include = default
include = ['model_' + _model if not _model.startswith('model_') else _model for _model in include]
# make sure that include contains same elements which are present in models
for elem in include:
assert elem in self.models, f"""
{elem} to `include` are not available.
Available cases are {self.models} and you wanted to include
{include}
"""
return include
[docs] def plot_improvement(
self,
metric_name: str,
plot_type: str = 'dumbbell',
lower_limit: Union[int, float] = -1.0,
upper_limit: Union[int, float] = None,
name: str = '',
**kwargs
) -> pd.DataFrame:
"""
Shows how much improvement was observed after hyperparameter
optimization. This plot is only available if ``run_type`` was set to
`optimize` in :py:meth:`ai4water.experiments.Experiments.fit`.
Arguments
---------
metric_name :
the peformance metric for comparison
plot_type : str, optional
the kind of plot to draw. Either ``dumbbell`` or ``bar``
lower_limit : float/int, optional (default=-1.0)
clip the values below this value. Set this value to None to avoid clipping.
upper_limit : float/int, optional (default=None)
clip the values above this value
name : str, optional
name of file to save the figure
**kwargs :
any additional keyword arguments for
`dumbell plot <https://easy-mpl.readthedocs.io/en/latest/plots.html#easy_mpl.dumbbell_plot>`_
or `bar_chart <https://easy-mpl.readthedocs.io/en/latest/plots.html#easy_mpl.bar_chart>`_
Returns
-------
pd.DataFrame
Examples
--------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> experiment = MLRegressionExperiments()
>>> experiment.fit(data=busan_beach(), run_type="optimize", num_iterations=30)
>>> experiment.plot_improvement('r2')
...
>>> # or draw dumbbell plot
...
>>> experiment.plot_improvement('r2', plot_type='bar')
"""
assert self._run_type == "optimize", f"""
when run_type argument during .fit() is {self._run_type}, we can
not have improvement plot"""
data: str = 'test'
assert data in ['training', 'test', 'validation']
improvement = pd.DataFrame(columns=['start', 'end'])
for model, model_iter_metrics in self.iter_metrics.items():
initial = model_iter_metrics[0][metric_name]
final = self.metrics[model]['test'][metric_name]
key = shred_model_name(model)
improvement.loc[key] = [initial, final]
baseline = improvement['start']
if lower_limit:
baseline = np.where(baseline < lower_limit, lower_limit, baseline)
if upper_limit:
baseline = np.where(baseline > upper_limit, upper_limit, baseline)
improvement['start'] = baseline
if plot_type == "dumbbell":
dumbbell_plot(
improvement['start'],
improvement['end'],
improvement.index.tolist(),
ax_kws=dict(xlabel=ERROR_LABELS.get(metric_name, metric_name)),
show=False,
**kwargs
)
#ax.set_xlabel(ERROR_LABELS.get(metric_name, metric_name))
else:
colors = {
'start': np.array([0, 56, 104]) / 256,
'end': np.array([126, 154, 178]) / 256
}
order = ['start', 'end']
if metric_name in ['r2', 'nse', 'kge', 'corr_coeff', 'r2_mod', 'r2_score']:
order = ['end', 'start']
fig, ax = plt.subplots()
for ordr in order:
bar_chart(improvement[ordr], improvement.index.tolist(),
ax=ax, color=colors[ordr], show=False,
ax_kws={'xlabel':ERROR_LABELS.get(metric_name, metric_name),
'label':ordr}, **kwargs)
ax.legend()
plt.title('Improvement after Optimization')
if self.save:
fname = os.path.join(
os.getcwd(),
f'results{SEP}{self.exp_name}{SEP}{name}_improvement_{metric_name}.png')
plt.savefig(fname, dpi=300, bbox_inches=kwargs.get('bbox_inches', 'tight'))
if self.show:
plt.show()
return improvement
[docs] def compare_errors(
self,
matric_name: str,
x=None,
y=None,
data = None,
cutoff_val: float = None,
cutoff_type: str = None,
sort_by: str = 'test',
ignore_nans: bool = True,
colors = None,
cmaps = None,
figsize:tuple = None,
**kwargs
) -> pd.DataFrame:
"""
Plots a specific performance matric for all the models which were
run during :py:meth:`ai4water.experiments.Experiments.fit` call.
Parameters
----------
matric_name : str
performance matric whose value to plot for all the models
x :
input data, if not given, then ``data`` must be given.
y :
target data
data :
raw unprocessed data from which x,y pairs can be drawn. This data
will be passed to :py:meth:`ai4water.preprocessing.DataSet` class and
:py:meth:`ai4water.preprocessing.DataSet.test_data` method
will be used to draw x,y pairs.
cutoff_val : float
if provided, only those models will be plotted for whome the
matric is greater/smaller than this value. This works in conjuction
with `cutoff_type`.
cutoff_type : str
one of ``greater``, ``greater_equal``, ``less`` or ``less_equal``.
Criteria to determine cutoff_val. For example if we want to
show only those models whose $R^2$ is > 0.5, it will be 'max'.
sort_by:
either ``test`` or ``train``. How to sort the results for plotting.
If 'test', then test performance matrics will be sorted otherwise
train performance matrics will be sorted.
ignore_nans:
default True, if True, then performance matrics with nans are ignored
otherwise nans/empty bars will be shown to depict which models have
resulted in nans for the given performance matric.
colors :
color for bar chart. To assign separate colors for both bar charts, provide
a list of two.
cmaps :
color map for bar chart. To assign separate cmap for both bar charts, provide
a list of two.
figsize : tuple
figure size as (width, height)
**kwargs :
any keyword argument that goes to `easy_mpl.bar_chart`
returns
-------
pd.DataFrame
pandas dataframe whose index is models and has two columns with name
'train' and 'test' These columns contain performance metrics for the
models..
Example
-------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> data = busan_beach()
>>> inputs = list(data.columns)[0:-1]
>>> outputs = list(data.columns)[-1]
>>> experiment = MLRegressionExperiments(input_features=inputs, output_features=outputs)
>>> experiment.fit(data=data)
>>> experiment.compare_errors('mse', data=data)
>>> experiment.compare_errors('r2', data=data, cutoff_val=0.2, cutoff_type='greater')
"""
_, _, _, _, x, y = self.verify_data(data=data, test_data=(x, y))
# populate self.metrics dictionary
self._build_predict_from_configs(x, y)
models = self.sort_models_by_metric(matric_name, cutoff_val, cutoff_type,
ignore_nans, sort_by)
plt.close('all')
fig, axis = plt.subplots(1, 2, sharey='all', figsize=figsize)
labels = [model.split('model_')[1] for model in models.index.tolist()]
models.index = labels
if kwargs is not None:
for arg in ['ax', 'labels', 'values', 'show', 'sort', 'ax_kws']:
assert arg not in kwargs, f"{arg} not allowed in kwargs"
color1, color2 = None, None
if colors is not None:
if hasattr(colors, '__len__') and len(colors)==2:
color1, color2 = colors
else:
color1 = colors
color2 = colors
cmap1, cmap2 = None, None
if cmaps is not None:
if hasattr(cmaps, '__len__') and len(cmaps)==2:
cmap1, cmap2 = cmaps
else:
cmap1 = cmaps
cmap2 = cmaps
bar_chart(ax=axis[0],
labels=labels,
color=color1,
cmap=cmap1,
values=models['train'],
ax_kws={'title':"Train",
'xlabel':ERROR_LABELS.get(matric_name, matric_name)},
show=False,
**kwargs,
)
bar_chart(ax=axis[1],
labels=labels,
values=models.iloc[:, 1],
color=color2,
cmap=cmap2,
ax_kws={'title': models.columns.tolist()[1],
'xlabel':ERROR_LABELS.get(matric_name, matric_name),
'show_yaxis':False},
show=False,
**kwargs
)
appendix = f"{cutoff_val or ''}{cutoff_type or ''}{len(models)}"
if self.save:
fname = os.path.join(
os.getcwd(),
f'results{SEP}{self.exp_name}{SEP}ErrorComprison_{matric_name}_{appendix}.png')
plt.savefig(fname, dpi=100, bbox_inches='tight')
if self.show:
plt.show()
return models
[docs] def loss_comparison(
self,
loss_name: str = 'loss',
include: list = None,
figsize: int = None,
start: int = 0,
end: int = None,
**kwargs
) -> plt.Axes:
"""
Plots the loss curves of the evaluated models. This method is only available
if the models which are being compared are deep leanring mdoels.
Parameters
----------
loss_name : str, optional
the name of loss value, must be recorded during training
include:
name of models to include
figsize : tuple
size of the figure
start : int
end : int
**kwargs :
any other keyword arguments to be passed to the
`plot <https://easy-mpl.readthedocs.io/en/latest/plots.html#easy_mpl.plot>`_
Returns
-------
matplotlib axes
Example
-------
>>> from ai4water.experiments import DLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> data = busan_beach()
>>> exp = DLRegressionExperiments(
>>> input_features = data.columns.tolist()[0:-1],
>>> output_features = data.columns.tolist()[-1:],
>>> epochs=300,
>>> train_fraction=1.0,
>>> y_transformation="log",
>>> x_transformation="minmax",
>>> )
>>> exp.fit(data=data)
>>> exp.loss_comparison()
you may wish to plot on log scale
>>> exp.loss_comparison(ax_kws={'logy':True})
"""
include = self._check_include_arg(include, self.considered_models_)
if self.model_.category == "ML":
raise NotImplementedError(f"Non neural network models can not have loss comparison")
loss_curves = {}
for _model, _path in self.eval_models.items():
if _model in include:
df = pd.read_csv(os.path.join(_path, 'losses.csv'), usecols=[loss_name])
loss_curves[_model] = df.values
end = end or len(df)
ax_kws = {
'xlabel': "Epochs",
'ylabel': 'Loss'}
if len(loss_curves) > 5:
ax_kws['legend_kws'] = {'bbox_to_anchor': (1.1, 0.99)}
_kws = {'linestyle': '-'}
if kwargs is not None:
if 'ax_kws' in kwargs:
ax_kws.update(kwargs.pop('ax_kws'))
_kws.update(kwargs)
_, axis = plt.subplots(figsize=figsize)
for _model, _loss in loss_curves.items():
label = shred_model_name(_model)
plot(_loss[start:end], ax=axis, label=label, show=False, ax_kws=ax_kws, **_kws)
axis.grid(ls='--', color='lightgrey')
if self.save:
fname = os.path.join(self.exp_path, f'loss_comparison_{loss_name}.png')
plt.savefig(fname, dpi=100, bbox_inches='tight')
if self.show:
plt.show()
return axis
[docs] def compare_convergence(
self,
name: str = 'convergence_comparison',
**kwargs
) -> Union[plt.Axes, None]:
"""
Plots and compares the convergence plots of hyperparameter optimization runs.
Only valid if `run_type=optimize` during :py:meth:`ai4water.experiments.Experiments.fit`
call.
Parameters
----------
name : str
name of file to save the plot
kwargs :
keyword arguments to plot_ function
Returns
-------
if the optimized models are >1 then it returns the maplotlib axes
on which the figure is drawn otherwise it returns None.
Examples
--------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> experiment = MLRegressionExperiments()
>>> experiment.fit(data=busan_beach(), run_type="optimize", num_iterations=30)
>>> experiment.compare_convergence()
.. _plot:
https://easy-mpl.readthedocs.io/en/latest/plots.html#easy_mpl.plot
"""
if len(self.optimized_models) < 1:
print('No model was optimized')
return
plt.close('all')
fig, axis = plt.subplots()
for _model, opt_path in self.optimized_models.items():
with open(os.path.join(opt_path, 'iterations.json'), 'r') as fp:
iterations = json.load(fp)
convergence = sort_array(list(iterations.keys()))
label = shred_model_name(_model)
_kws = dict(
linestyle='--',
ax_kws = dict(xlabel='Number of iterations $n$',
ylabel=r"$\min f(x)$ after $n$ calls",
label=label)
)
if kwargs is not None:
_kws.update(kwargs)
plot(
convergence,
ax=axis,
show=False,
**_kws
)
if self.save:
fname = os.path.join(self.exp_path, f'{name}.png')
plt.savefig(fname, dpi=100, bbox_inches='tight')
if self.show:
plt.show()
return axis
def _load_model(self, model_name:str):
"""
builds the model from config and then update the weights
and returns it
"""
m_path = self._get_best_model_path(model_name)
c_path = os.path.join(m_path, 'config.json')
model = self.build_from_config(c_path)
# calculate pr curve for each model
self.update_model_weight(model, m_path)
return model
[docs] def compare_edf_plots(
self,
x=None,
y=None,
data=None,
exclude:Union[list, str] = None,
figsize=None,
fname: Optional[str] = "edf",
**kwargs
):
"""compare EDF plots of all the models which have been fitted.
This plot is only available for regression problems.
parameters
----------
x :
input data
y :
target data
data :
raw unprocessed data from which x,y pairs of the test data are drawn
exclude : list
name of models to exclude from plotting
figsize :
figure size as (width, height)
fname : str, optional
name of the file to save plot
**kwargs
any keword arguments for `py:meth:ai4water.utils.utils.edf_plot`
Returns
-------
plt.Figure
matplotlib
Example
-------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> dataset = busan_beach()
>>> inputs = list(dataset.columns)[0:-1]
>>> outputs = list(dataset.columns)[-1]
>>> experiment = MLRegressionExperiments(input_features=inputs, output_features=outputs)
>>> experiment.fit(data=dataset, include="LMs")
>>> experiment.compare_edf_plots(data=dataset, exclude="SGDRegressor")
"""
assert self.mode == "regression", f"This plot is not available for {self.mode} mode"
_, _, _, _, x, y = self.verify_data(data=data, test_data=(x, y))
model_folders = self._get_model_folders()
if exclude is None:
exclude = []
elif isinstance(exclude, str):
exclude = [exclude]
fig, axes = plt.subplots(figsize=figsize)
# load all models from config
for model_name in model_folders:
if model_name not in exclude:
model = self._load_model(model_name)
true, prediction = model.predict(x, y, return_true=True,
process_results=False)
assert len(true) == true.size
assert len(prediction) == prediction.size
error = np.abs(true.reshape(-1,) - prediction.reshape(-1,))
if model_name.endswith("Regressor"):
label = model_name.split("Regressor")[0]
elif model_name.endswith("Classifier"):
label = model_name.split("Classifier")[0]
else:
label = model_name
edf_plot(error, xlabel="Absolute Error", ax=axes, label=label,
show=False, **kwargs)
axes.grid(ls='--', color='lightgrey')
if len(model_folders)>7:
axes.legend(loc=(1.05, 0.0))
if self.save:
fname = os.path.join(self.exp_path, f'{fname}.png')
plt.savefig(fname, dpi=600, bbox_inches='tight')
if self.show:
plt.show()
return
[docs] def compare_regression_plots(
self,
x=None,
y=None,
data=None,
include: Union[None, list] = None,
figsize: tuple=None,
fname: Optional[str] = "regression",
**kwargs
)->plt.Figure:
"""compare regression plots of all the models which have been fitted.
This plot is only available for regression problems.
parameters
----------
x :
input data
y :
target data
data :
raw unprocessed data from which x,y pairs of the test data are drawn
include : str, list, optional
if not None, must be a list of models which will be included.
None will result in plotting all the models.
figsize :
figure size as (width, length)
fname : str, optional
name of the file to save the plot
**kwargs
any keyword arguments for `obj`:easy_mpl.reg_plot
Returns
-------
plt.Figure
matplotlib
Example
-------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> dataset = busan_beach()
>>> inputs = list(dataset.columns)[0:-1]
>>> outputs = list(dataset.columns)[-1]
>>> experiment = MLRegressionExperiments(input_features=inputs, output_features=outputs)
>>> experiment.fit(data=dataset)
>>> experiment.compare_regression_plots(data=dataset)
"""
assert self.mode == "regression", f"""
This plot is not available for {self.mode} mode"""
_, _, _, _, x, y = self.verify_data(data=data, test_data=(x, y))
model_folders = self._get_model_folders()
include = self._check_include_arg(include, self.considered_models_)
fig, axes = create_subplots(naxes=len(include),
figsize=figsize, sharex="all")
if not isinstance(axes, np.ndarray):
axes = np.array(axes)
# load all models from config
for model_name, ax in zip(include, axes.flat):
model_name = model_name.split('model_')[1]
model = self._load_model(model_name)
true, prediction = model.predict(x, y, return_true=True,
process_results=False)
if np.isnan(prediction).sum() == prediction.size:
if self.verbosity>=0:
print(f"Model {model_name} only predicted nans")
continue
reg_plot(true, prediction, marker_size=5, ax=ax, show=False,
**kwargs)
ax.set_xlabel('')
ax.set_ylabel('')
if model_name.endswith("Regressor"):
label = model_name.split("Regressor")[0]
elif model_name.endswith("Classifier"):
label = model_name.split("Classifier")[0]
else:
label = model_name
ax.legend(labels=[label],
fontsize=9,
numpoints=2,
fancybox=False,
framealpha=0.0)
if hasattr(fig, "supxlabel"):
fig.supxlabel("Observed", fontsize=14)
fig.supylabel("Predicted", fontsize=14)
if self.save:
fname = os.path.join(self.exp_path, f'{fname}.png')
plt.savefig(fname, dpi=600, bbox_inches='tight')
if self.show:
plt.show()
return fig
[docs] def compare_residual_plots(
self,
x=None,
y=None,
data = None,
include: Union[None, list] = None,
figsize: tuple = None,
fname: Optional[str] = "residual"
)->plt.Figure:
"""compare residual plots of all the models which have been fitted.
This plot is only available for regression problems.
parameters
----------
x :
input data
y :
target data
data :
raw unprocessed data frmm which test x,y pairs are drawn using
:py:meth:`ai4water.preprocessing.DataSet`. class. Only valid if x and y are not given.
include : str, list, optional
if not None, must be a list of models which will be included.
None will result in plotting all the models.
figsize : tuple
figure size as (width, height)
fname : str, optional
name of file to save the plot
Returns
-------
plt.Figure
matplotlib
Example
-------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> dataset = busan_beach()
>>> inputs = list(dataset.columns)[0:-1]
>>> outputs = list(dataset.columns)[-1]
>>> experiment = MLRegressionExperiments(input_features=inputs, output_features=outputs)
>>> experiment.fit(data=dataset)
>>> experiment.compare_residual_plots(data=dataset)
"""
assert self.mode == "regression", f"This plot is not available for {self.mode} mode"
include = self._check_include_arg(include, self.considered_models_)
_, _, _, _, x, y = self.verify_data(data=data, test_data=(x, y))
model_folders = self._get_model_folders()
fig, axes = create_subplots(naxes=len(include), figsize=figsize)
if not isinstance(axes, np.ndarray):
axes = np.array(axes)
# load all models from config
for model_and_name, ax in zip(include, axes.flat):
model_name = model_and_name.split('model_')[1]
model = self._load_model(model_name)
true, prediction = model.predict(x, y, return_true=True, process_results=False)
plot(
prediction,
true - prediction,
'o',
show=False,
ax=ax,
color="darksalmon",
markerfacecolor=np.array([225, 121, 144]) / 256.0,
markeredgecolor="black",
markeredgewidth=0.15,
markersize=1.5,
)
# draw horizontal line on y=0
ax.axhline(0.0)
if model_name.endswith("Regressor"):
label = model_name.split("Regressor")[0]
elif model_name.endswith("Classifier"):
label = model_name.split("Classifier")[0]
else:
label = model_name
ax.legend(labels=[label], fontsize=9,
numpoints=2,
fancybox=False, framealpha=0.0)
if hasattr(fig, "supxlabel"):
fig.supxlabel("Prediction")
fig.supylabel("Residual")
if self.save:
fname = os.path.join(self.exp_path, f'{fname}.png')
plt.savefig(fname, dpi=600, bbox_inches='tight')
if self.show:
plt.show()
return fig
[docs] @classmethod
def from_config(
cls,
config_path: str,
**kwargs
) -> "Experiments":
"""
Loads the experiment from the config file.
Arguments:
config_path : complete path of experiment
kwargs : keyword arguments to experiment
Returns:
an instance of `Experiments` class
"""
if not config_path.endswith('.json'):
raise ValueError(f"""
{config_path} is not a json file
""")
with open(config_path, 'r') as fp:
config = json.load(fp)
cv_scores = {}
scoring = "mse"
for model_name, model_path in config['eval_models'].items():
with open(os.path.join(model_path, 'config.json'), 'r') as fp:
model_config = json.load(fp)
# if cross validation was performed, then read those results.
cross_validator = model_config['config']['cross_validator']
if cross_validator is not None:
cv_name = str(list(cross_validator.keys())[0])
scoring = model_config['config']['val_metric']
cv_fname = os.path.join(model_path, f'{cv_name}_{scoring}' + ".json")
if os.path.exists(cv_fname):
with open(cv_fname, 'r') as fp:
cv_scores[model_name] = json.load(fp)
exp = cls(exp_name=config['exp_name'], cases=config['cases'], **kwargs)
#exp.config = config
exp._from_config = True
# following four attributes are only available if .fit was run
exp.considered_models_ = config.get('considered_models_', [])
exp.is_binary_ = config.get('is_binary_', None)
exp.is_multiclass_ = config.get('is_multiclass_', None)
exp.is_multilabel_ = config.get('is_multilabel_', None)
exp.metrics = load_json_file(
os.path.join(os.path.dirname(config_path), "metrics.json"))
exp.features = load_json_file(
os.path.join(os.path.dirname(config_path), "features.json"))
exp.cv_scores_ = cv_scores
exp._cv_scoring = scoring
return exp
[docs] def plot_cv_scores(
self,
name: str = "cv_scores",
exclude: Union[str, list] = None,
include: Union[str, list] = None,
**kwargs
) -> Union[plt.Axes, None]:
"""
Plots the box whisker plots of the cross validation scores.
This plot is only available if cross_validation was set to True during
:py:meth:`ai4water.experiments.Experiments.fit`.
Arguments
---------
name : str
name of the file to save the plot
include : str/list
models to include
exclude : models to exclude
**kwargs : any of the following keyword arguments
- notch
- vert
- figsize
- bbox_inches
Returns
-------
matplotlib axes if the figure is drawn otherwise None
Example
-------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> exp = MLRegressionExperiments(cross_validator={"KFold": {"n_splits": 10}})
>>> exp.fit(data=busan_beach(), cross_validate=True)
>>> exp.plot_cv_scores()
"""
if len(self.cv_scores_) == 0:
return
scoring = self._cv_scoring
cv_scores = self.cv_scores_
consider_exclude(exclude, self.models, cv_scores)
cv_scores = self._consider_include(include, cv_scores)
model_names = [m.split('model_')[1] for m in list(cv_scores.keys())]
if len(model_names) < 5:
rotation = 0
else:
rotation = 90
plt.close()
_, axis = plt.subplots(figsize=kwargs.get('figsize', (8, 6)))
axis.boxplot(np.array(list(cv_scores.values())).squeeze().T,
notch=kwargs.get('notch', None),
vert=kwargs.get('vert', None),
labels=model_names
)
axis.set_xticklabels(model_names, rotation=rotation)
axis.set_xlabel("Models", fontsize=16)
axis.set_ylabel(ERROR_LABELS.get(scoring, scoring), fontsize=16)
fname = os.path.join(os.getcwd(),
f'results{SEP}{self.exp_name}{SEP}{name}_{len(model_names)}.png')
if self.save:
plt.savefig(fname, dpi=300, bbox_inches=kwargs.get('bbox_inches', 'tight'))
if self.show:
plt.show()
return axis
def _compare_cls_curves(
self, x, y, func, name,
figsize:tuple=None,
**kwargs
):
assert self.mode == "classification", f"""
{name} is only available for classification mode."""
model_folders = [p for p in os.listdir(self.exp_path) if os.path.isdir(os.path.join(self.exp_path, p))]
_, ax = plt.subplots(figsize=figsize)
# find all the model folders
m_paths = []
for m in model_folders:
if any(m in m_ for m_ in self.considered_models_):
m_paths.append(m)
nplots = 0
# load all models from config
for model_name in m_paths:
model = self._load_model(model_name)
kws = {'estimator': model,
'X': x,
'y': y.reshape(-1, ),
'ax': ax,
'name': model.model_name
}
if kwargs:
kws.update(kwargs)
if 'LinearSVC' in model.model_name:
# sklearn LinearSVC does not have predict_proba
# but ai4water Model does have this method
# which will only throw error
kws['estimator'] = model._model
if model.model_name in ['Perceptron', 'PassiveAggressiveClassifier',
'NearestCentroid', 'RidgeClassifier',
'RidgeClassifierCV']:
continue
if model.model_name in ['NuSVC', 'SVC']:
if not model._model.get_params()['probability']:
continue
if 'SGDClassifier' in model.model_name:
if model._model.get_params()['loss'] == 'hinge':
continue
func(**kws)
nplots += 1
ax.grid(ls='--', color='lightgrey')
if nplots>5:
plt.legend(bbox_to_anchor=(1.1, 0.99))
if self.save:
fname = os.path.join(self.exp_path, f"{name}.png")
plt.savefig(fname, dpi=300, bbox_inches='tight')
if self.show:
plt.show()
return ax
[docs] def compare_precision_recall_curves(
self,
x,
y,
figsize:tuple=None,
**kwargs
):
"""compares precision recall curves of the all the models.
parameters
----------
x :
input data
y :
labels for the input data
figsize : tuple
figure size
**kwargs :
any keyword arguments for :obj:matplotlib.plot function
Returns
-------
plt.Axes
matplotlib axes on which figure is drawn
Example
-------
>>> from ai4water.datasets import MtropicsLaos
>>> from ai4water.experiments import MLClassificationExperiments
>>> data = MtropicsLaos().make_classification(lookback_steps=1)
# define inputs and outputs
>>> inputs = data.columns.tolist()[0:-1]
>>> outputs = data.columns.tolist()[-1:]
# initiate the experiment
>>> exp = MLClassificationExperiments(
... input_features=inputs,
... output_features=outputs)
# run the experiment
>>> exp.fit(data=data, include=["model_LGBMClassifier",
... "model_XGBClassifier",
... "RandomForestClassifier"])
... # Compare Precision Recall curves
>>> exp.compare_precision_recall_curves(data[inputs].values, data[outputs].values)
"""
return self._compare_cls_curves(
x,
y,
name="precision_recall_curves",
func=sklearn.metrics.PrecisionRecallDisplay.from_estimator,
figsize=figsize,
**kwargs
)
[docs] def compare_roc_curves(
self,
x,
y,
figsize:tuple=None,
**kwargs
):
"""compares roc curves of the all the models.
parameters
----------
x :
input data
y :
labels for the input data
figsize : tuple
figure size
**kwargs :
any keyword arguments for :obj:matplotlib.plot function
Returns
-------
plt.Axes
matplotlib axes on which figure is drawn
Example
-------
>>> from ai4water.datasets import MtropicsLaos
>>> from ai4water.experiments import MLClassificationExperiments
>>> data = MtropicsLaos().make_classification(lookback_steps=1)
# define inputs and outputs
>>> inputs = data.columns.tolist()[0:-1]
>>> outputs = data.columns.tolist()[-1:]
# initiate the experiment
>>> exp = MLClassificationExperiments(
... input_features=inputs,
... output_features=outputs)
# run the experiment
>>> exp.fit(data=data, include=["model_LGBMClassifier",
... "model_XGBClassifier",
... "RandomForestClassifier"])
... # Compare ROC curves
>>> exp.compare_roc_curves(data[inputs].values, data[outputs].values)
"""
return self._compare_cls_curves(
x=x,
y=y,
name="roc_curves",
func=sklearn.metrics.RocCurveDisplay.from_estimator,
figsize=figsize,
**kwargs
)
[docs] def sort_models_by_metric(
self,
metric_name,
cutoff_val=None,
cutoff_type=None,
ignore_nans: bool = True,
sort_by="test"
) -> pd.DataFrame:
"""returns the models sorted according to their performance"""
idx = list(self.metrics.keys())
metrics = dict()
metrics['train'] = np.array([v['train'][metric_name] for v in self.metrics.values()])
if 'test' in list(self.metrics.values())[0]:
metrics['test'] = np.array([v['test'][metric_name] for v in self.metrics.values()])
else:
metrics['val'] = np.array([v['val'][metric_name] for v in self.metrics.values()])
if 'test' not in metrics and sort_by == "test":
sort_by = "val"
df = pd.DataFrame(metrics, index=idx)
if ignore_nans:
df = df.dropna()
df = df.sort_values(by=[sort_by], ascending=False)
if cutoff_type is not None:
assert cutoff_val is not None
if cutoff_type == "greater":
df = df.loc[df[sort_by] > cutoff_val]
else:
df = df.loc[df[sort_by] < cutoff_val]
return df
[docs] def fit_with_tpot(
self,
data,
models: Union[int, List[str], dict, str] = None,
selection_criteria: str = 'mse',
scoring: str = None,
**tpot_args
):
"""
Fits the tpot_'s fit method which
finds out the best pipline for the given data.
Arguments
---------
data :
models :
It can be of three types.
- If list, it will be the names of machine learning models/
algorithms to consider.
- If integer, it will be the number of top
algorithms to consider for tpot. In such a case, you must have
first run `.fit` method before running this method. If you run
the tpot using all available models, it will take hours to days
for medium sized data (consisting of few thousand examples). However,
if you run first .fit and see for example what are the top 5 models,
then you can set this argument to 5. In such a case, tpot will search
pipeline using only the top 5 algorithms/models that have been found
using .fit method.
- if dictionary, then the keys should be the names of algorithms/models
and values shoudl be the parameters for each model/algorithm to be
optimized.
- You can also set it to ``all`` consider all models available in
ai4water's Experiment module.
- default is None, which means, the `tpot_config` argument will be None
selection_criteria :
The name of performance metric. If ``models`` is integer, then
according to this performance metric the models will be choosen.
By default the models will be selected based upon their mse values
on test data.
scoring : the performance metric to use for finding the pipeline.
tpot_args :
any keyword argument for tpot's Regressor_ or Classifier_ class.
This can include arguments like ``generations``, ``population_size`` etc.
Returns
-------
the tpot object
Example
-------
>>> from ai4water.experiments import MLRegressionExperiments
>>> from ai4water.datasets import busan_beach
>>> exp = MLRegressionExperiments(exp_name=f"tpot_reg_{dateandtime_now()}")
>>> exp.fit(data=busan_beach())
>>> tpot_regr = exp.fit_with_tpot(busan_beach(), 2, generations=1, population_size=2)
.. _tpot:
http://epistasislab.github.io/tpot/
.. _Regressor:
http://epistasislab.github.io/tpot/api/#regression
.. _Classifier:
http://epistasislab.github.io/tpot/api/#classification
"""
tpot_caller = self.tpot_estimator
assert tpot_caller is not None, f"tpot must be installed"
param_space = {}
tpot_config = None
for m in self.models:
getattr(self, m)()
ps = getattr(self, 'param_space')
path = getattr(self, 'path')
param_space[m] = {path: {p.name: p.grid for p in ps}}
if isinstance(models, int):
assert len(self.metrics) > 1, f"""
you must first run .fit() method in order to choose top {models} models"""
# sort the models w.r.t their performance
sorted_models = self.sort_models_by_metric(selection_criteria)
# get names of models
models = sorted_models.index.tolist()[0:models]
tpot_config = {}
for m in models:
c: dict = param_space[f"{m}"]
tpot_config.update(c)
elif isinstance(models, list):
tpot_config = {}
for m in models:
c: dict = param_space[f"model_{m}"]
tpot_config.update(c)
elif isinstance(models, dict):
tpot_config = {}
for mod_name, mod_paras in models.items():
if "." in mod_name:
mod_path = mod_name
else:
c: dict = param_space[f"model_{mod_name}"]
mod_path = list(c.keys())[0]
d = {mod_path: mod_paras}
tpot_config.update(d)
elif isinstance(models, str) and models == "all":
tpot_config = {}
for mod_name, mod_config in param_space.items():
mod_path = list(mod_config.keys())[0]
mod_paras = list(mod_config.values())[0]
tpot_config.update({mod_path: mod_paras})
fname = os.path.join(self.exp_path, "tpot_config.json")
with open(fname, 'w') as fp:
json.dump(jsonize(tpot_config), fp, indent=True)
tpot = tpot_caller(
verbosity=self.verbosity + 1,
scoring=scoring,
config_dict=tpot_config,
**tpot_args
)
not_allowed_args = ["cross_validator", "wandb_config", "val_metric",
"loss", "optimizer", "lr", "epochs", "quantiles", "patience"]
model_kws = self.model_kws
for arg in not_allowed_args:
if arg in model_kws:
model_kws.pop(arg)
dh = DataSet(data, **model_kws)
train_x, train_y = dh.training_data()
tpot.fit(train_x, train_y.reshape(-1, 1))
if "regressor" in self.tpot_estimator.__name__:
mode = "regression"
else:
mode = "classification"
visualizer = ProcessPredictions(path=self.exp_path,
show=bool(self.verbosity),
mode=mode)
for idx, data_name in enumerate(['training', 'test']):
x_data, y_data = getattr(dh, f"{data_name}_data")(key=str(idx))
pred = tpot.fitted_pipeline_.predict(x_data)
r2 = RegressionMetrics(y_data, pred).r2()
# todo, perform inverse transform and deindexification
visualizer(
pd.DataFrame(y_data.reshape(-1, )),
pd.DataFrame(pred.reshape(-1, )),
)
# save the python code of fitted pipeline
tpot.export(os.path.join(self.exp_path, "tpot_fitted_pipeline.py"))
# save each iteration
fname = os.path.join(self.exp_path, "evaluated_individuals.json")
with open(fname, 'w') as fp:
json.dump(tpot.evaluated_individuals_, fp, indent=True)
return tpot
def _build_fit(
self,
train_x=None,
train_y=None,
validation_data=None,
view=False,
title=None,
cross_validate: bool=False,
refit: bool=False,
**kwargs
)->Model:
model: Model = self._build(title=title, **kwargs)
self._fit(
model,
train_x=train_x,
train_y=train_y,
validation_data=validation_data,
cross_validate=cross_validate,
refit = refit,
)
if view:
self._model.view()
return self.model_
def _build_fit_eval(
self,
train_x=None,
train_y=None,
validation_data:tuple=None,
view=False,
title=None,
cross_validate: bool=False,
refit: bool=False,
**kwargs
)->float:
"""
Builds and run one 'model' of the experiment.
Since an experiment consists of many models, this method
is also run many times.
refit : bool
This means fit on training + validation data. This is true
when we have optimized the hyperparameters and now we would
like to fit on training + validation data as well.
"""
model = self._build_fit(train_x, train_y,
validation_data, view, title,
cross_validate, refit, **kwargs)
# return the validation score
return self._evaluate(model, *validation_data)
def _build(self, title=None, **suggested_paras):
"""Builds the ai4water Model class and makes it a class attribute."""
suggested_paras = self._pre_build_hook(**suggested_paras)
suggested_paras = jsonize(suggested_paras)
verbosity = max(self.verbosity - 1, 0)
if 'verbosity' in self.model_kws:
verbosity = self.model_kws.pop('verbosity')
if self.category == "DL":
model = FModel
else:
model = Model
model = model(
prefix=title,
verbosity=verbosity,
**self.model_kws,
**suggested_paras
)
setattr(self, 'model_', model)
return model
def _fit(
self,
model:Model,
train_x,
train_y,
validation_data,
cross_validate: bool = False,
refit: bool = False
):
"""Trains the model"""
if cross_validate:
return model.cross_val_score(*_combine_training_validation_data(
train_x,
train_y,
validation_data))
if refit:
# we need to combine training (x,y) + validation data.
return model.fit_on_all_training_data(*_combine_training_validation_data(
train_x,
train_y,
validation_data=validation_data))
if self.category == "DL":
model.fit(x=train_x, y=train_y, validation_data=validation_data)
else:
model.fit(x=train_x, y=train_y)
# model_ is used in the class for prediction so it must be the
# updated/trained model
self.model_ = model
return
def _evaluate(
self,
model:Model,
x,
y,
) -> float:
"""Evaluates the model"""
#if validation_data is None:
t, p = model.predict(
x=x, y=y,
return_true=True,
process_results=False)
test_metrics = self._get_metrics(t, p)
metrics = Metrics[self.mode](t, p,
remove_zero=True,
remove_neg=True,
replace_nan=True,
replace_inf=True,
multiclass=self.is_multiclass_)
self.model_iter_metric[self.iter_] = test_metrics
self.iter_ += 1
val_score_ = getattr(metrics, model.val_metric)()
val_score = val_score_
if model.val_metric in [
'r2', 'nse', 'kge', 'r2_mod', 'r2_adj', 'r2_score'
] or self.mode == "classification":
val_score = 1.0 - val_score_
if not math.isfinite(val_score):
val_score = 9999 # TODO, find a better way to handle this
print(f"val_score: {round(val_score, 5)} {model.val_metric}: {val_score_}")
return val_score
def _predict(
self,
model: Model,
x,
y
)->Tuple[np.ndarray, np.ndarray]:
"""
Makes predictions on training and test data from the model.
It is supposed that the model has been trained before."""
true, predicted = model.predict(x, y, return_true=True, process_results=False)
if np.isnan(predicted).sum() == predicted.size:
warnings.warn(f"model {model.model_name} predicted only nans")
else:
ProcessPredictions(self.mode,
forecast_len=model.forecast_len,
path=model.path,
output_features=model.output_features,
plots=self.plots_,
show=bool(model.verbosity),
)(true, predicted)
return true, predicted
[docs] def verify_data(
self,
x=None,
y=None,
data=None,
validation_data: tuple = None,
test_data: tuple = None,
) -> tuple:
"""
verifies that either
- only x,y should be given (val will be taken from it according to splitting schemes)
- or x,y and validation_data should be given (means no test data)
- or x, y and validation_data and test_data are given
- or only data should be given (train, validation and test data will be
taken accoring to splitting schemes)
"""
def num_examples(samples):
if isinstance(samples, list):
assert len(set(len(sample) for sample in samples)) == 1
return len(samples[0])
return len(samples)
model_maker = make_model(**self.model_kws)
data_config = model_maker.data_config
if x is None:
assert y is None, f"y must only be given if x is given. x is {type(x)}"
if data is None:
# x,y and data are not given, we may be given test/validation data
train_x, train_y = None, None
if validation_data is None:
val_x, val_y = None, None
else:
val_x, val_y = validation_data
if test_data is None:
test_x, test_y = None, None
else:
test_x, test_y = test_data
else:
# case 4, only data is given
assert data is not None, f"if x is given, data must not be given"
assert validation_data is None, f"validation data must only be given if x is given"
#assert test_data is None, f"test data must only be given if x is given"
data_config.pop('category')
if 'lookback' in self._named_x0() and 'ts_args' not in self.model_kws:
# the value of lookback has been set by model_maker which can be wrong
# because the user expects it to be hyperparameter
data_config['ts_args']['lookback'] = self._named_x0()['lookback']
# when saving is done during initialization of DataSet and verbosity>0
# it prints information two times!
save = data_config.pop('save') or True
dataset = DataSet(data=data,
save=False,
category=self.category,
**data_config)
if save:
verbosity = dataset.verbosity
dataset.verbosity = 0
dataset.to_disk()
dataset.verbosity = verbosity
train_x, train_y = dataset.training_data()
val_x, val_y = dataset.validation_data() # todo what if there is not validation data
test_x, test_y = dataset.test_data()
if len(test_x) == 0:
test_x, test_y = None, None
elif test_data is None and validation_data is None:
# case 1, only x,y are given
assert num_examples(x) == num_examples(y)
splitter= TrainTestSplit(data_config['val_fraction'], seed=data_config['seed'] or 313)
if data_config['split_random']:
train_x, val_x, train_y, val_y = splitter.split_by_random(x, y)
else:
train_x, val_x, train_y, val_y = splitter.split_by_slicing(x, y)
test_x, test_y = None, None
elif test_data is None:
# case 2: x,y and validation_data should be given (means no test data)
assert num_examples(x) == num_examples(y)
train_x, train_y = x, y
val_x, val_y = validation_data
test_x, test_y = None, None
else:
# case 3
assert num_examples(x) == num_examples(y)
train_x, train_y = x, y
val_x, val_y = validation_data
test_x, test_y = test_data
return train_x, train_y, val_x, val_y, test_x, test_y
def _get_model_folders(self):
model_folders = [p for p in os.listdir(self.exp_path) if os.path.isdir(os.path.join(self.exp_path, p))]
# find all the model folders
m_folders = []
for m in model_folders:
if any(m in m_ for m_ in self.considered_models_):
m_folders.append(m)
return m_folders
def _build_predict_from_configs(self, x, y):
model_folders = self._get_model_folders()
# load all models from config
for model_name in model_folders:
model = self._load_model(model_name)
out = model.predict(x=x, y=y, return_true=True, process_results=False)
self._populate_results(f"model_{model_name}", train_results=None, test_results=out)
return
def _get_best_model_path(self, model_name):
m_path = os.path.join(self.exp_path, model_name)
if len(os.listdir(m_path)) == 1:
m_path = os.path.join(m_path, os.listdir(m_path)[0])
elif 'best' in os.listdir(m_path):
# within best folder thre is another folder
m_path = os.path.join(m_path, 'best')
assert len(os.listdir(m_path)) == 1
m_path = os.path.join(m_path, os.listdir(m_path)[0])
else:
folders = [path for path in os.listdir(m_path) if
os.path.isdir(os.path.join(m_path, path)) and path.startswith('1_')]
if len(folders) == 1:
m_path = os.path.join(m_path, folders[0])
else:
raise ValueError(f"Cant find best model in {m_path}")
return m_path
class TransformationExperiments(Experiments):
"""Helper class to conduct experiments with different transformations
Example:
>>> from ai4water.datasets import busan_beach
>>> from ai4water.experiments import TransformationExperiments
>>> from ai4water.hyperopt import Integer, Categorical, Real
... # Define your experiment
>>> class MyTransformationExperiments(TransformationExperiments):
...
... def update_paras(self, **kwargs):
... _layers = {
... "LSTM": {"config": {"units": int(kwargs['lstm_units']}},
... "Dense": {"config": {"units": 1, "activation": kwargs['dense_actfn']}},
... "reshape": {"config": {"target_shape": (1, 1)}}
... }
... return {'model': {'layers': _layers},
... 'lookback': int(kwargs['lookback']),
... 'batch_size': int(kwargs['batch_size']),
... 'lr': float(kwargs['lr']),
... 'transformation': kwargs['transformation']}
>>> data = busan_beach()
>>> inputs = ['tide_cm', 'wat_temp_c', 'sal_psu', 'air_temp_c', 'pcp_mm', 'pcp3_mm']
>>> outputs = ['tetx_coppml']
>>> cases = {'model_minmax': {'transformation': 'minmax'},
... 'model_zscore': {'transformation': 'zscore'}}
>>> search_space = [
... Integer(low=16, high=64, name='lstm_units', num_samples=2),
... Integer(low=3, high=15, name="lookback", num_samples=2),
... Categorical(categories=[4, 8, 12, 16, 24, 32], name='batch_size'),
... Real(low=1e-6, high=1.0e-3, name='lr', prior='log', num_samples=2),
... Categorical(categories=['relu', 'elu'], name='dense_actfn'),
... ]
>>> x0 = [20, 14, 12, 0.00029613, 'relu']
>>> experiment = MyTransformationExperiments(cases=cases, input_features=inputs,
... output_features=outputs, exp_name="testing"
... param_space=search_space, x0=x0)
"""
@property
def mode(self):
return "regression"
@property
def category(self):
return "ML"
def __init__(self,
param_space=None,
x0=None,
cases: dict = None,
exp_name: str = None,
num_samples: int = 5,
verbosity: int = 1,
**model_kws):
self.param_space = param_space
self.x0 = x0
exp_name = exp_name or 'TransformationExperiments' + f'_{dateandtime_now()}'
super().__init__(
cases=cases,
exp_name=exp_name,
num_samples=num_samples,
verbosity=verbosity,
**model_kws
)
@property
def tpot_estimator(self):
return None
def update_paras(self, **suggested_paras):
raise NotImplementedError(f"""
You must write the method `update_paras` which should build the Model with suggested parameters
and return the keyword arguments including `model`. These keyword arguments will then
be used to build ai4water's Model class.
""")
def _build(self, title=None, **suggested_paras):
"""Builds the ai4water Model class"""
suggested_paras = jsonize(suggested_paras)
verbosity = max(self.verbosity - 1, 0)
if 'verbosity' in self.model_kws:
verbosity = self.model_kws.pop('verbosity')
model = Model(
prefix=title,
verbosity=verbosity,
**self.update_paras(**suggested_paras),
**self.model_kws
)
setattr(self, 'model_', model)
return model
def process_model_before_fit(self, model):
"""So that the user can perform processing of the model by overwriting this method"""
return model
def sort_array(array):
"""
array: [4, 7, 3, 9, 4, 8, 2, 8, 7, 1]
returns: [4, 4, 3, 3, 3, 3, 2, 2, 2, 1]
"""
results = np.array(array, dtype=np.float32)
iters = range(1, len(results) + 1)
return [np.min(results[:i]) for i in iters]
def consider_exclude(exclude: Union[str, list],
models,
models_to_filter: Union[list, dict] = None
):
if isinstance(exclude, str):
exclude = [exclude]
if exclude is not None:
exclude = ['model_' + _model if not _model.startswith('model_') else _model for _model in exclude]
for elem in exclude:
assert elem in models, f"""
{elem} to `exclude` is not available.
Available models are {models} and you wanted to exclude
{exclude}"""
if models_to_filter is not None:
# maybe the model has already been removed from models_to_filter
# when we considered include keyword argument.
if elem in models_to_filter:
if isinstance(models_to_filter, list):
models_to_filter.remove(elem)
else:
models_to_filter.pop(elem)
else:
assert elem in models, f'{elem} is not in models'
return
def load_json_file(fpath):
with open(fpath, 'r') as fp:
result = json.load(fp)
return result
def save_json_file(fpath, obj):
with open(fpath, 'w') as fp:
json.dump(jsonize(obj), fp, sort_keys=True, indent=4)
def shred_model_name(model_name):
key = model_name[6:] if model_name.startswith('model_') else model_name
key = key[0:-9] if key.endswith("Regressor") else key
return key
def _combine_training_validation_data(
x_train,
y_train,
validation_data=None,
)->tuple:
"""
combines x,y pairs of training and validation data.
"""
if validation_data is None:
return x_train, y_train
x_val, y_val = validation_data
if isinstance(x_train, list):
x = []
for val in range(len(x_train)):
if x_val is not None:
_val = np.concatenate([x_train[val], x_val[val]])
x.append(_val)
else:
_val = x_train[val]
y = y_train
if hasattr(y_val, '__len__') and len(y_val) > 0:
y = np.concatenate([y_train, y_val])
elif isinstance(x_train, np.ndarray):
x, y = x_train, y_train
# if not validation data is available then use only training data
if x_val is not None:
if hasattr(x_val, '__len__') and len(x_val)>0:
x = np.concatenate([x_train, x_val])
y = np.concatenate([y_train, y_val])
else:
raise NotImplementedError
return x, y