from typing import Callable, Union, List
from easy_mpl import plot
from ai4water.backend import np, os, plt, pd
from ._explain import ExplainerMixin
# todo, optionally show predicted value as dots on plots
def compute_bounds(xmin, xmax, xv):
"""
from shap
Handles any setting of xmax and xmin.
Note that we handle None, float, or "percentile(float)" formats.
"""
if xmin is not None or xmax is not None:
if type(xmin) == str and xmin.startswith("percentile"):
xmin = np.nanpercentile(xv, float(xmin[11:-1]))
if type(xmax) == str and xmax.startswith("percentile"):
xmax = np.nanpercentile(xv, float(xmax[11:-1]))
if xmin is None or xmin == np.nanmin(xv):
xmin = np.nanmin(xv) - (xmax - np.nanmin(xv))/20
if xmax is None or xmax == np.nanmax(xv):
xmax = np.nanmax(xv) + (np.nanmax(xv) - xmin)/20
return xmin, xmax
[docs]class PartialDependencePlot(ExplainerMixin):
"""
Partial dependence plots as introduced by Friedman_ et al., 2001
Example
-------
>>> from ai4water import Model
>>> from ai4water.datasets import busan_beach
>>> from ai4water.postprocessing.explain import PartialDependencePlot
>>> data = busan_beach()
>>> model = Model(model="XGBRegressor")
>>> model.fit(data=data)
# get the data to explain
>>> x, _ = model.training_data()
>>> pdp = PartialDependencePlot(model.predict, x, model.input_features,
>>> num_points=14)
.. _Friedman:
https://doi.org/10.1214/aos/1013203451
"""
[docs] def __init__(
self,
model: Callable,
data,
feature_names=None,
num_points: int = 100,
path=None,
save: bool = True,
show: bool = True,
**kwargs
):
"""Initiates the class
Parameters
----------
model : Callable
the trained/calibrated model which must be callable. It must take the
`data` as input and sprout an array of predicted values. For example
if you are using Keras/sklearn model, then you must pass model.predict
data : np.ndarray, pd.DataFrame
The inputs to the `model`. It can numpy array or pandas DataFrame.
feature_names : list, optional
Names of features. Used for labeling.
num_points : int, optional
determines the grid for evaluation of `model`
path : str, optional
path to save the plots. By default the results are saved in current directory
show:
whether to show the plot or not
save:
whether to save the plot or not
**kwargs :
any additional keyword arguments for `model`
"""
self.model = model
self.num_points = num_points
self.xmin = "percentile(0)"
self.xmax = "percentile(100)"
self.kwargs = kwargs
if isinstance(data, pd.DataFrame):
if feature_names is None:
feature_names = data.columns.tolist()
data = data.values
super().__init__(data=data,
features=feature_names,
path=path or os.getcwd(),
show=show,
save=save
)
[docs] def nd_interactions(
self,
height: int = 2,
ice: bool = False,
show_dist: bool = False,
show_minima: bool = False,
) -> plt.Figure:
"""Plots 2d interaction plots of all features as done in skopt
Arguments:
height:
height of each subplot in inches
ice:
whether to show the ice lines or not
show_dist:
whether to show the distribution of data as histogram or not
show_minima:
whether to show the function minima or not
Returns:
matplotlib Figure
Examples
--------
>>> from ai4water import Model
>>> from ai4water.datasets import busan_beach
>>> from ai4water.postprocessing.explain import PartialDependencePlot
>>> data = busan_beach()
>>> model = Model(model="XGBRegressor")
>>> model.fit(data=busan_beach())
>>> x, _ = model.training_data()
>>> pdp = PartialDependencePlot(model.predict, x, model.input_features,
... num_points=14)
>>> pdp.nd_interactions(show_dist=True)
"""
n_dims = len(self.features)
fig, ax = plt.subplots(n_dims, n_dims, figsize=(height * n_dims, height * n_dims))
fig.subplots_adjust(left=0.05, right=0.95, bottom=0.05, top=0.95,
hspace=0.1, wspace=0.1)
for i in range(n_dims):
for j in range(n_dims):
# diagonal
if i == j:
if n_dims > 1:
ax_ = ax[i, i]
else:
ax_ = ax
self._plot_pdp_1dim(*self.calc_pdp_1dim(self.data, self.features[i]),
self.data,
self.features[i],
show_dist=show_dist,
show_minima=show_minima,
ice=ice, show=False, save=False,
ax=ax_)
# resetting the label
# ax_.set_xlabel(self.features[i])
ax_.set_ylabel(self.features[i])
process_axis(ax_,
xlabel=self.features[i],
top_spine=True,
right_spine=True)
# lower triangle
elif i > j:
self.plot_interaction(
features=[self.features[j],self.features[i]],
ax=ax[i, j],
colorbar=False,
save=False,
show=False,
)
elif j > i:
if not ax[i, j].lines: # empty axes
ax[i, j].axis("off")
if j > 0: # not the left most column
ax[i, j].yaxis.set_ticks([])
ax[i, j].yaxis.set_visible(False)
ax[i, j].yaxis.label.set_visible(False)
if i < n_dims-1: # not the bottom most row
ax[i, j].xaxis.set_ticks([])
ax[i, j].xaxis.set_visible(False)
ax[i, j].xaxis.label.set_visible(False)
if self.save:
fname = os.path.join(self.path, f"pdp_interact_nd")
plt.savefig(fname, bbox_inches="tight", dpi=100*n_dims)
if self.show:
plt.show()
return fig
[docs] def plot_interaction(
self,
features: list,
lookback: int = None,
ax: plt.Axes = None,
plot_type: str = "2d",
cmap=None,
colorbar: bool = True,
show:bool = True,
save:bool = True,
**kwargs
) -> plt.Axes:
"""Shows interaction between two features
Parameters
----------
features :
a list or tuple of two feature names to use
lookback : optional
only relevant in data is 3d
ax : optional
matplotlib axes on which to draw. If not given, current axes will
be used.
plot_type : optional
either "2d" or "surface"
cmap : optional
color map to use
colorbar : optional
whether to show the colorbar or not
show : bool
save : bool
**kwargs :
any keyword argument for axes.plot_surface or axes.contourf
Returns
-------
matplotlib Axes
Examples
--------
>>> from ai4water import Model
>>> from ai4water.datasets import busan_beach
>>> from ai4water.postprocessing.explain import PartialDependencePlot
>>> data = busan_beach()
>>> model = Model(model="XGBRegressor")
>>> model.fit(data=busan_beach())
>>> x, _ = model.training_data()
>>> pdp = PartialDependencePlot(model.predict, x, model.input_features,
... num_points=14)
... # specifying features whose interaction is to be calculated and plotted.
>>> axis = pdp.plot_interaction(["tide_cm", "wat_temp_c"])
"""
if not self.data_is_2d:
raise NotImplementedError
assert isinstance(features, list) and len(features) == 2
x0, x1, pd_vals = self._interaction(features, self.data, lookback)
kwds = {}
if plot_type == "surface":
kwds['projection'] = '3d'
if ax is None:
fig = plt.figure()
ax = fig.add_subplot(111, **kwds)
if plot_type == "surface":
_add_surface(ax, x0, x1, pd_vals, cmap, features[0], features[1], **kwargs)
else:
self._plot_interaction(ax, x0, x1, pd_vals,
cmap=cmap,
features=features,
lookback=lookback,
colorbar=colorbar, **kwargs)
if save:
fname = os.path.join(self.path, f"pdp_interact{features[0]}_{features[1]}")
plt.savefig(fname, bbox_inches="tight", dpi=300)
if show:
plt.show()
return ax
def _plot_interaction(
self, ax, x0, x1, pd_vals, cmap,
features,
lookback,
colorbar=True,
**kwargs):
"""adds a 2d interaction plot"""
cntr = ax.contourf(x0, x1, pd_vals, cmap=cmap, **kwargs)
xv0 = self.xv(self.data, features[0], lookback)
xv1 = self.xv(self.data, features[1], lookback)
ax.scatter(xv0, xv1, c='k', s=10, lw=0.)
process_axis(ax, xlabel=features[0], ylabel=features[1])
if colorbar:
cbar = plt.colorbar(cntr, ax=ax)
cbar.set_label(f"E[f(x) | {features[0]}, {features[1]} ]", rotation=90)
return
def _interaction(self, features, data, lookback):
ind0 = self._feature_to_ind(features[0])
ind1 = self._feature_to_ind(features[1])
xs0 = self.grid(self.data, features[0], lookback)
xs1 = self.grid(self.data, features[1], lookback)
features_tmp = data.copy()
x0 = np.zeros((self.num_points, self.num_points))
x1 = np.zeros((self.num_points, self.num_points))
# instead of calling the model in two for loops, prepare data data
# stack it in 'features_all' and call the model only once.
total_samples = len(data) * self.num_points * self.num_points
features_all = np.full((total_samples, *data.shape[1:]), np.nan)
st, en = 0, len(data)
for i in range(self.num_points):
for j in range(self.num_points):
features_tmp[:, ind0] = xs0[i]
features_tmp[:, ind1] = xs1[j]
x0[i, j] = xs0[i]
x1[i, j] = xs1[j]
features_all[st:en] = features_tmp
st = en
en += len(data)
predictions = self.model(features_all)
pd_vals = np.zeros((self.num_points, self.num_points))
st, en = 0, len(data)
for i in range(self.num_points):
for j in range(self.num_points):
pd_vals[i, j] = predictions[st:en].mean()
st = en
en += len(data)
return x0, x1, pd_vals
[docs] def plot_1d(
self,
feature:Union[str, List[str]],
show_dist: bool = True,
show_dist_as: str = "hist",
ice: bool = True,
feature_expected_value: bool = False,
model_expected_value: bool = False,
show_ci: bool = False,
show_minima: bool = False,
ice_only: bool = False,
ice_color: str = "lightblue",
feature_name:str = None,
pdp_line_kws: dict = None,
ice_lines_kws: dict = None,
hist_kws:dict = None
):
"""partial dependence plot in one dimension
Parameters
----------
feature :
the feature name for which to plot the partial dependence
For one hot encoded categorical features, provide a list
show_dist :
whether to show actual distribution of data or not
show_dist_as :
one of "hist" or "grid"
ice :
whether to show individual component elements on plot or not
feature_expected_value :
whether to show the average value of feature on the plot or not
model_expected_value :
whether to show average prediction on plot or not
show_ci :
whether to show confidence interval of pdp or not
show_minima :
whether to indicate the minima or not
ice_only : bool, False
whether to show only ice plots
ice_color :
color for ice lines. It can also be a valid maplotlib
`colormap <https://matplotlib.org/3.5.1/tutorials/colors/colormaps.html>`_
feature_name : str
name of the feature. If not given, then value of ``feature`` is used.
pdp_line_kws : dict
any keyword argument for axes.plot when plotting pdp lie
ice_lines_kws : dict
any keyword argument for axes.plot when plotting ice lines
hist_kws :
any keyword arguemnt for axes.hist when plotting histogram
Examples
---------
>>> from ai4water import Model
>>> from ai4water.datasets import busan_beach
>>> model = Model(model="XGBRegressor")
>>> data = busan_beach()
>>> model.fit(data=data)
>>> x, _ = model.training_data(data=data)
>>> pdp = PartialDependencePlot(model.predict, x, model.input_features,
... num_points=14)
>>> pdp.plot_1d("tide_cm")
with categorical features
>>> from ai4water.datasets import mg_photodegradation
>>> data, cat_enc, an_enc = mg_photodegradation(encoding="ohe")
>>> model = Model(model="XGBRegressor")
>>> model.fit(data=data)
>>> x, _ = model.training_data(data=data)
>>> pdp = PartialDependencePlot(model.predict, x, model.input_features,
... num_points=14)
>>> feature = [f for f in model.input_features if f.startswith('Catalyst_type')]
>>> pdp.plot_1d(feature)
>>> pdp.plot_1d(feature, show_dist_as="grid")
>>> pdp.plot_1d(feature, show_dist=False)
>>> pdp.plot_1d(feature, show_dist=False, ice=False)
>>> pdp.plot_1d(feature, show_dist=False, ice=False, model_expected_value=True)
>>> pdp.plot_1d(feature, show_dist=False, ice=False, feature_expected_value=True)
"""
if isinstance(feature, tuple):
raise NotImplementedError
else:
if self.single_source:
if self.data_is_2d:
pdp_vals, ice_vals = self.calc_pdp_1dim(self.data, feature)
ax = self._plot_pdp_1dim(
pdp_vals,
ice_vals,
self.data, feature,
show_dist=show_dist,
show_dist_as=show_dist_as,
ice=ice,
feature_expected_value=feature_expected_value,
show_ci=show_ci, show_minima=show_minima,
model_expected_value=model_expected_value,
show=self.show,
save=self.save,
ice_only=ice_only,
ice_color=ice_color,
feature_name=feature_name,
ice_lines_kws=ice_lines_kws,
pdp_line_kws=pdp_line_kws,
hist_kws=hist_kws
)
elif self.data_is_3d:
for lb in range(self.data.shape[1]):
pdp_vals, ice_vals = self.calc_pdp_1dim(self.data, feature, lb)
ax = self._plot_pdp_1dim(
pdp_vals,
ice_vals,
data=self.data,
feature=feature,
lookback=lb,
show_ci=show_ci,
show_minima=show_minima,
show_dist=show_dist,
show_dist_as=show_dist_as,
ice=ice,
feature_expected_value=feature_expected_value,
model_expected_value=model_expected_value,
show=self.show,
save=self.save,
ice_only=ice_only,
ice_color=ice_color,
ice_lines_kws=ice_lines_kws,
pdp_line_kws=pdp_line_kws,
hist_kws=hist_kws)
else:
raise ValueError(f"invalid data shape {self.data.shape}")
else:
for data in self.data:
if self.data_is_2d:
ax = self.calc_pdp_1dim(data, feature)
else:
for lb in []:
ax = self.calc_pdp_1dim(data, feature, lb)
return ax
[docs] def xv(self, data, feature, lookback=None):
ind = self._feature_to_ind(feature)
if data.ndim == 3:
xv = data[:, lookback, ind]
else:
xv = data[:, ind]
return xv
[docs] def grid(self, data, feature, lookback=None):
"""generates the grid for evaluation of model"""
if isinstance(feature, list):
# one hot encoded feature
self.num_points = len(feature)
xs = pd.get_dummies(feature)
return [repeat(xs.iloc[i].values, len(data)) for i in range(len(xs))]
xmin, xmax = compute_bounds(self.xmin,
self.xmax,
self.xv(data, feature, lookback))
return np.linspace(xmin, xmax, self.num_points)
[docs] def calc_pdp_1dim(self, data, feature, lookback=None):
"""calculates partial dependence for 1 dimension data"""
ind = self._feature_to_ind(feature)
xs = self.grid(data, feature, lookback)
data_temp = data.copy()
# instead of calling the model for each num_point, prepare the data
# stack it in 'data_all' and call the model only once
total_samples = len(data) * self.num_points
data_all = np.full((total_samples, *data.shape[1:]), np.nan)
pd_vals = np.full(self.num_points, np.nan)
ice_vals = np.full((self.num_points, data.shape[0]), np.nan)
st, en = 0, len(data)
for i in range(self.num_points):
if data.ndim == 3:
data_temp[:, lookback, ind] = xs[i]
else:
data_temp[:, ind] = xs[i]
data_all[st:en] = data_temp
st = en
en += len(data)
predictions = self.model(data_all, **self.kwargs)
st, en = 0, len(data)
for i in range(self.num_points):
pred = predictions[st:en]
pd_vals[i] = pred.mean()
ice_vals[i, :] = pred.reshape(-1, )
st = en
en += len(data)
return pd_vals, ice_vals
def _feature_to_ind(
self,
feature:Union[str, List[str]]
) -> int:
ind = feature
if isinstance(feature, str):
if self.single_source:
ind = self.features.index(feature)
else:
raise NotImplementedError
elif isinstance(feature, list):
ind = [self.features.index(i) for i in feature]
elif not isinstance(feature, int):
raise ValueError
return ind
def _plot_pdp_1dim(
self,
pd_vals,
ice_vals,
data,
feature,
lookback=None,
show_dist=True, show_dist_as="hist",
ice=True, show_ci=False,
show_minima=False,
feature_expected_value=False,
model_expected_value=False,
show=True, save=False, ax=None,
ice_color="lightblue",
ice_only:bool = False,
feature_name:str = None,
pdp_line_kws:dict = None,
ice_lines_kws:dict = None,
hist_kws:dict = None,
):
xmin, xmax = compute_bounds(self.xmin,
self.xmax,
self.xv(data, feature, lookback))
if ax is None:
fig = plt.figure()
ax = fig.add_axes((0.1, 0.3, 0.8, 0.6))
if isinstance(feature, list):
xs = np.arange(len(feature))
if feature_name is None:
feature_name = f"Feature"
else:
if feature_name is None:
feature_name = feature
xs = self.grid(data, feature, lookback)
ylabel = "E[f(x) | " + feature_name + "]"
if ice:
n = ice_vals.shape[1]
if ice_color in plt.colormaps():
colors = plt.get_cmap(ice_color)(np.linspace(0, 0.8, n))
else:
colors = [ice_color for _ in range(n)]
_ice_lines_kws = dict(linewidth=min(1, 50 / n), alpha=1)
if ice_lines_kws is not None:
_ice_lines_kws.update(ice_lines_kws)
for _ice in range(n):
ax.plot(xs, ice_vals[:, _ice], color=colors[_ice],
**_ice_lines_kws)
ylabel = "f(x) | " + feature_name
if show_ci:
std = np.std(ice_vals, axis=1)
upper = pd_vals + std
lower = pd_vals - std
color = '#66C2D7'
if ice_color != "lightblue":
if ice_color not in plt.colormaps():
color = ice_color
ax.fill_between(xs, upper, lower, alpha=0.14, color=color)
# the line plot
_pdp_line_kws = dict(color='blue', linewidth=2, alpha=1)
if not ice_only:
if pdp_line_kws is not None:
_pdp_line_kws.update(pdp_line_kws)
plot(xs, pd_vals, show=False, ax=ax, **_pdp_line_kws)
title = None
if lookback is not None:
title = f"lookback: {lookback}"
process_axis(ax,
ylabel=ylabel,
ylabel_kws=dict(fontsize=20),
right_spine=False,
top_spine=False,
tick_params=dict(labelsize=11),
xlabel=feature_name,
xlabel_kws=dict(fontsize=20),
title=title)
if isinstance(feature, list):
ax.set_xticks(xs)
ax.set_xticklabels(feature, rotation=90)
ax.xaxis.set_ticks_position('bottom')
ax.yaxis.set_ticks_position('left')
ax2 = ax.twinx()
if show_dist:
_hist_kws = dict(density=False, facecolor='black', alpha=0.1)
if hist_kws is not None:
_hist_kws.update(hist_kws)
xv = self.xv(data, feature, lookback)
if show_dist_as == "hist":
ax2.hist(xv, 50, range=(xmin, xmax), **_hist_kws)
else:
_add_dist_as_grid(fig, xv, other_axes=ax, xlabel=feature,
xlabel_kws=dict(fontsize=20))
process_axis(ax2,
right_spine=False,
top_spine=False,
left_spine=False,
bottom_spine=False,
ylim=(0, data.shape[0]))
ax2.xaxis.set_ticks_position('bottom')
ax2.yaxis.set_ticks_position('left')
ax2.yaxis.set_ticks([])
if feature_expected_value:
self._add_feature_exp_val(ax2, ax, xmin, xmax, data, feature,
lookback=lookback,
feature_name=feature_name)
if model_expected_value:
self._add_model_exp_val(ax2, ax, data)
if show_minima:
minina = self.model(data, **self.kwargs).min()
ax.axvline(minina, linestyle="--", color="r", lw=1)
if save:
lookback = lookback or ''
fname = os.path.join(self.path, f"pdp_{feature_name}_{lookback}")
plt.savefig(fname, bbox_inches="tight", dpi=400)
if show:
plt.show()
return ax
def _add_model_exp_val(self, ax, original_axis, data):
"""adds model expected value on a duplicate axis of ax"""
model_expected_val = self.model(data, **self.kwargs).mean()
ax2 = ax.twinx()
ymin, ymax = original_axis.get_ylim()
process_axis(ax2, ylim=(ymin, ymax),
yticks=[model_expected_val],
yticklabels=["E[f(x)]"],
right_spine=False,
top_spine=False,
tick_params=dict(length=0, labelsize=11)
)
original_axis.axhline(model_expected_val, color="#999999", zorder=-1,
linestyle="--", linewidth=1)
return
def _add_feature_exp_val(self, ax, original_axis, xmin, xmax, data,
feature,
feature_name=None,
lookback=None):
xv = self.xv(data=data, feature=feature, lookback=lookback)
mval = xv.mean()
ax3 = ax.twiny()
process_axis(ax3,
xlim=(xmin, xmax),
xticks=[mval], xticklabels=["E[" + feature_name + "]"],
tick_params={'length': 0, 'labelsize': 11}, top_spine=False,
right_spine=False)
original_axis.axvline(mval, color="#999999", zorder=-1, linestyle="--",
linewidth=1)
return
def process_axis(
ax: plt.Axes,
title=None, title_kws=None,
xlabel=None, xlabel_kws=None,
ylabel=None, ylabel_kws=None,
xticks=None, xticklabels=None,
yticks=None, yticklabels=None,
tick_params=None,
top_spine=None, right_spine=None, bottom_spine=None, left_spine=None,
xlim=None, ylim=None
):
"""processes a matplotlib axes"""
if title:
title_kws = title_kws or {}
ax.set_title(title, **title_kws)
if ylabel:
ylabel_kws = ylabel_kws or {}
ax.set_ylabel(ylabel, **ylabel_kws)
if xlabel:
xlabel_kws = xlabel_kws or {}
ax.set_xlabel(xlabel, **xlabel_kws)
if xlim is not None:
ax.set_xlim(xlim)
if ylim is not None:
ax.set_ylim(ylim)
if xticks:
ax.set_xticks(xticks)
if xticklabels is not None:
ax.set_xticklabels(xticklabels)
if yticks:
ax.set_yticks(yticks)
if yticklabels is not None:
ax.set_yticklabels(yticklabels)
if tick_params:
ax.tick_params(**tick_params)
if top_spine is False:
ax.spines['top'].set_visible(False)
elif top_spine is True:
ax.spines['top'].set_visible(True)
if right_spine is False:
ax.spines['right'].set_visible(False)
elif right_spine is True:
ax.spines['right'].set_visible(True)
if bottom_spine is False:
ax.spines['bottom'].set_visible(False)
if left_spine is False:
ax.spines['left'].set_visible(False)
return
def _add_dist_as_grid(fig: plt.Figure, hist_data, other_axes: plt.Axes,
xlabel=None, xlabel_kws=None, **plot_params):
"""Data point distribution plot for numeric feature"""
ax = fig.add_axes((0.1, 0.1, 0.8, 0.14), sharex=other_axes)
process_axis(ax, top_spine=False, xlabel=xlabel, xlabel_kws=xlabel_kws,
bottom_spine=False,
right_spine=False, left_spine=False)
ax.yaxis.set_visible(False) # hide the yaxis
ax.xaxis.set_visible(False) # hide the x-axis
color = plot_params.get('pdp_color', '#1A4E5D')
ax.plot(hist_data, [1] * len(hist_data), '|', color=color, markersize=20)
return
def _add_surface(ax, x0, x1, pd_vals, cmap, feature0, feature2, **kwargs):
ax.plot_surface(x0, x1, pd_vals, cmap=cmap, **kwargs)
ax.set_xlabel(feature0, fontsize=11)
ax.set_ylabel(feature2, fontsize=11)
ax.set_zlabel(f"E[f(x) | {feature0} {feature2} ]", fontsize=11)
return
def repeat(array, n:int):
# (len(array),) -> (len(array), n)
return np.tile(array, n).reshape(-1, len(array))