__all__ = ["DualAttentionModel", "InputAttentionModel"]
from easy_mpl import imshow
from .backend import tf, plt, np, os
from .backend import keras
from .functional import Model as FModel
from ai4water.utils.utils import print_something
from .utils.utils import DataNotFound
from ai4water.nn_tools import check_act_fn
from ai4water.preprocessing import DataSet
from ai4water.models._tensorflow.layer_definition import MyTranspose, MyDot
from ai4water.utils.utils import plot_activations_along_inputs
layers = keras.layers
KModel = keras.models.Model
class DALSTM(keras.layers.Layer):
def __init__(
self,
enc_config: dict = None,
dec_config: dict = None,
drop_remainder: bool = True,
teacher_forcing: bool = False,
**kwargs
):
self.enc_config = enc_config
self.dec_config = dec_config
self.drop_remainder = drop_remainder
self.teacher_forcing = teacher_forcing
super().__init__(**kwargs)
raise NotImplementedError
[docs]class DualAttentionModel(FModel):
"""
This is Dual-Attention LSTM model of Qin_ et al., 2017. The code is adopted
from this_ repository
Example:
>>> from ai4water import DualAttentionModel
>>> from ai4water.datasets import busan_beach
>>> data = busan_beach()
>>> model = DualAttentionModel(lookback=5,
... input_features=data.columns.tolist()[0:-1],
... output_features=data.columns.tolist()[-1:])
... #If you do not wish to feed previous output as input to the model, you
... #can set teacher forcing to False. The drop_remainder argument must be
... #set to True in such a case.
>>> model = DualAttentionModel(teacher_forcing=False, batch_size=4,
... drop_remainder=True, ts_args={'lookback':5})
>>> model.fit(data=data)
.. _Qin:
https://arxiv.org/abs/1704.02971
.. _this:
https://github.com/chensvm/A-Dual-Stage-Attention-Based-Recurrent-Neural-Network-for-Time-Series-Prediction
"""
_enc_config = {'n_h': 20, # length of hidden state m
'n_s': 20, # length of hidden state m
'm': 20, # length of hidden state m
'enc_lstm1_act': None,
'enc_lstm2_act': None,
}
# arguments for decoder/outputAttention in Dual stage attention
_dec_config = {
'p': 30,
'n_hde0': 30,
'n_sde0': 30
}
[docs] def __init__(
self,
enc_config: dict = None,
dec_config: dict = None,
teacher_forcing: bool = True,
**kwargs
):
"""
Arguments:
enc_config:
dictionary defining configuration of encoder/input attention. It must
have following three keys
- n_h: 20
- n_s: 20
- m: 20
- enc_lstm1_act: None
- enc_lstm2_act: None
dec_config:
dictionary defining configuration of decoder/output attention. It must have
following three keys
- p: 30
- n_hde0: None
- n_sde0: None
teacher_forcing:
Whether to use the prvious target/observation as input or not. If
yes, then the model will require 2 inputs. The first input will be
of shape (num_examples, lookback, num_inputs) while the second input
will be of shape (num_examples, lookback-1, 1). This second input is
supposed to be the target variable observed at previous time step.
kwargs :
The keyword arguments for the [ai4water's Model][ai4water.Model] class
"""
self.method = 'dual_attention'
if enc_config is None:
enc_config = DualAttentionModel._enc_config
else:
assert isinstance(enc_config, dict)
if dec_config is None:
dec_config = DualAttentionModel._dec_config
else:
assert isinstance(dec_config, dict)
self.enc_config = enc_config
self.dec_config = dec_config
super(DualAttentionModel, self).__init__(teacher_forcing=teacher_forcing, **kwargs)
setattr(self, 'category', "DL")
[docs] def build(self, input_shape=None):
self.config['dec_config'] = self.dec_config
self.config['enc_config'] = self.enc_config
setattr(self, 'batch_size', self.config['batch_size'])
setattr(self, 'drop_remainder', self.config['drop_remainder'])
self.de_LSTM_cell = layers.LSTM(self.dec_config['p'], return_state=True, name='decoder_LSTM')
self.de_densor_We = layers.Dense(self.enc_config['m'])
if self.config['drop_remainder']:
h_de0 = tf.zeros((self.batch_size, self.dec_config['n_hde0']), name='dec_1st_hidden_state')
s_de0 = tf.zeros((self.batch_size, self.dec_config['n_sde0']), name='dec_1st_cell_state')
else:
h_de0 = layers.Input(shape=(self.dec_config['n_hde0'],), name='dec_1st_hidden_state')
s_de0 = layers.Input(shape=(self.dec_config['n_sde0'],), name='dec_1st_cell_state')
input_y = None
if self.teacher_forcing and self.drop_remainder:
input_y = layers.Input(batch_shape=(self.batch_size, self.lookback - 1, self.num_outs), name='input_y')
elif not self.drop_remainder:
input_y = layers.Input(shape=(self.lookback - 1, self.num_outs), name='input_y')
if self.drop_remainder:
enc_input = keras.layers.Input(batch_shape=(self.batch_size, self.lookback, self.num_ins), name='enc_input')
else:
enc_input = keras.layers.Input(shape=(self.lookback, self.num_ins), name='enc_input')
enc_lstm_out, s0, h0 = self._encoder(enc_input, self.config['enc_config'])
# originally the last dimentions was -1 but I put it equal to 'm'
# eq 11 in paper
enc_out = layers.Reshape((self.lookback, self.enc_config['m']), name='enc_out_eq_11')(enc_lstm_out)
h, context = self.decoder_attention(enc_out, input_y, s_de0, h_de0)
h = layers.Reshape((self.num_outs, self.dec_config['p']))(h)
# concatenation of decoder hidden state and the context vector.
last_concat = layers.Concatenate(axis=2, name='last_concat')([h, context]) # (None, 1, 50)
# original it was not defined but in tf-keras we need to define it
sec_dim = self.enc_config['m'] + self.dec_config['p']
last_reshape = layers.Reshape((sec_dim,), name='last_reshape')(last_concat) # (None, 50)
result = layers.Dense(self.dec_config['p'], name='eq_22')(last_reshape) # (None, 30) # equation 22
output = layers.Dense(self.num_outs)(result)
if self.forecast_len>1:
output = layers.Reshape(target_shape=(self.num_outs, self.forecast_len))(output)
initial_input = [enc_input]
if input_y is not None:
initial_input.append(input_y)
if self.config['drop_remainder']:
self._model = self.compile(model_inputs=initial_input, outputs=output)
else:
self._model = self.compile(model_inputs=initial_input + [s0, h0, s_de0, h_de0], outputs=output)
return
def _encoder(self, enc_inputs, config, lstm2_seq=True, suf: str = '1', s0=None, h0=None, num_ins=None):
if num_ins is None:
num_ins = self.num_ins
self.en_densor_We = layers.Dense(self.lookback, name='enc_We_'+suf)
_config, act_str = check_act_fn({'activation': config['enc_lstm1_act']})
self.en_LSTM_cell = layers.LSTM(config['n_h'], return_state=True, activation=_config['activation'],
name='encoder_LSTM_'+suf)
config['enc_lstm1_act'] = act_str
# initialize the first cell state
if s0 is None:
if self.drop_remainder:
s0 = tf.zeros((self.batch_size, config['n_s']), name=f'enc_first_cell_state_{suf}')
else:
s0 = layers.Input(shape=(config['n_s'],), name='enc_first_cell_state_' + suf)
# initialize the first hidden state
if h0 is None:
if self.drop_remainder:
h0 = tf.zeros((self.batch_size, config['n_h']), name=f'enc_first_hidden_state_{suf}')
else:
h0 = layers.Input(shape=(config['n_h'],), name='enc_first_hidden_state_' + suf)
enc_attn_out = self.encoder_attention(enc_inputs, s0, h0, num_ins, suf)
enc_lstm_in = layers.Reshape((self.lookback, num_ins), name='enc_lstm_input_'+suf)(enc_attn_out)
_config, act_str = check_act_fn({'activation': config['enc_lstm2_act']})
enc_lstm_out = layers.LSTM(config['m'], return_sequences=lstm2_seq, activation=_config['activation'],
name='LSTM_after_encoder_'+suf)(enc_lstm_in) # h_en_all
config['enc_lstm2_act'] = act_str
return enc_lstm_out, h0, s0
[docs] def one_encoder_attention_step(self, h_prev, s_prev, x, t, suf: str = '1'):
"""
:param h_prev: previous hidden state
:param s_prev: previous cell state
:param x: (T,n),n is length of input series at time t,T is length of time series
:param t: time-step
:param suf: str, Suffix to be attached to names
:return: x_t's attention weights,total n numbers,sum these are 1
"""
_concat = layers.Concatenate()([h_prev, s_prev]) # (none,1,2m)
result1 = self.en_densor_We(_concat) # (none,1,T)
result1 = layers.RepeatVector(x.shape[2],)(result1) # (none,n,T)
x_temp = MyTranspose(axis=(0, 2, 1))(x) # X_temp(None,n,T)
# (none,n,T) Ue(T,T), Ue * Xk in eq 8 of paper
result2 = MyDot(self.lookback, name='eq_8_mul_'+str(t)+'_'+suf)(x_temp)
result3 = layers.Add()([result1, result2]) # (none,n,T)
result4 = layers.Activation(activation='tanh')(result3) # (none,n,T)
result5 = MyDot(1)(result4)
result5 = MyTranspose(axis=(0, 2, 1), name='eq_8_' + str(t)+'_'+suf)(result5) # etk/ equation 8
alphas = layers.Activation(activation='softmax', name='eq_9_'+str(t)+'_'+suf)(result5) # equation 9
return alphas
[docs] def encoder_attention(self, _input, _s0, _h0, num_ins, suf: str = '1'):
s = _s0
_h = _h0
# initialize empty list of outputs
attention_weight_t = None
for t in range(self.lookback):
_context = self.one_encoder_attention_step(_h, s, _input, t, suf=suf) # (none,1,n)
x = layers.Lambda(lambda x: _input[:, t, :])(_input)
x = layers.Reshape((1, num_ins))(x)
_h, _, s = self.en_LSTM_cell(x, initial_state=[_h, s])
if t != 0:
# attention_weight_t = layers.Merge(mode='concat', concat_axis=1,
# name='attn_weight_'+str(t))([attention_weight_t,
# _context])
attention_weight_t = layers.Concatenate(
axis=1,
name='attn_weight_'+str(t)+'_'+suf)([attention_weight_t, _context])
else:
attention_weight_t = _context
# get the driving input series
enc_output = layers.Multiply(name='enc_output_'+suf)([attention_weight_t, _input]) # equation 10 in paper
return enc_output
[docs] def one_decoder_attention_step(self, _h_de_prev, _s_de_prev, _h_en_all, t):
"""
:param _h_de_prev: previous hidden state
:param _s_de_prev: previous cell state
:param _h_en_all: (None,T,m),n is length of input series at time t,T is length of time series
:param t: int, timestep
:return: x_t's attention weights,total n numbers,sum these are 1
"""
# concatenation of the previous hidden state and cell state of the LSTM unit in eq 12
_concat = layers.Concatenate(name='eq_12_'+str(t))([_h_de_prev, _s_de_prev]) # (None,1,2p)
result1 = self.de_densor_We(_concat) # (None,1,m)
result1 = layers.RepeatVector(self.lookback)(result1) # (None,T,m)
result2 = MyDot(self.enc_config['m'])(_h_en_all)
result3 = layers.Add()([result1, result2]) # (None,T,m)
result4 = layers.Activation(activation='tanh')(result3) # (None,T,m)
result5 = MyDot(1)(result4)
beta = layers.Activation(activation='softmax', name='eq_13_'+str(t))(result5) # equation 13
_context = layers.Dot(axes=1, name='eq_14_'+str(t))([beta, _h_en_all]) # (1,m) # equation 14 in paper
return _context
[docs] def decoder_attention(self, _h_en_all, _y, _s0, _h0):
s = _s0
_h = _h0
for t in range(self.lookback-1):
_context = self.one_decoder_attention_step(_h, s, _h_en_all, t) # (batch_size, 1, 20)
# if we want to use the true value of target of previous timestep as input then we will use _y
if self.teacher_forcing:
y_prev = layers.Lambda(lambda y_prev: _y[:, t, :])(_y) # (batch_size, lookback, 1) -> (batch_size, 1)
y_prev = layers.Reshape((1, self.num_outs))(y_prev) # -> (batch_size, 1, 1)
# concatenation of decoder input and computed context vector # ??
y_prev = layers.Concatenate(axis=2)([y_prev, _context]) # (None,1,21)
else:
y_prev = _context
y_prev = layers.Dense(self.num_outs, name='eq_15_'+str(t))(y_prev) # (None,1,1), Eq 15 in paper
_h, _, s = self.de_LSTM_cell(y_prev, initial_state=[_h, s]) # eq 16 ??
_context = self.one_decoder_attention_step(_h, s, _h_en_all, 'final')
return _h, _context
[docs] def fetch_data(self, x, y, source, data=None, **kwargs):
if self.teacher_forcing:
x, prev_y, labels = getattr(self.dh_, f'{source}_data')(**kwargs)
else:
x, labels = getattr(self.dh_, f'{source}_data')(**kwargs)
prev_y = None
n_s_feature_dim = self.enc_config['n_s']
n_h_feature_dim = self.enc_config['n_h']
p_feature_dim = self.dec_config['p']
if kwargs.get('use_datetime_index', False): # during deindexification, first feature will be removed.
n_s_feature_dim += 1
n_h_feature_dim += 1
p_feature_dim += 1
idx = np.expand_dims(x[:, 1:, 0], axis=-1) # extract the index from x
if self.use_true_prev_y:
prev_y = np.concatenate([prev_y, idx], axis=2) # insert index in prev_y
other_inputs = []
if not self.drop_remainder:
s0 = np.zeros((x.shape[0], n_s_feature_dim))
h0 = np.zeros((x.shape[0], n_h_feature_dim))
h_de0 = s_de0 = np.zeros((x.shape[0], p_feature_dim))
other_inputs = [s0, h0, s_de0, h_de0]
if self.teacher_forcing:
return [x, prev_y] + other_inputs, labels
else:
return [x] + other_inputs, labels
[docs] def training_data(self, x=None, y=None, data='training', key=None):
self._maybe_dh_not_set(data=data)
return self.fetch_data(x=x, y=y, source='training', data=data, key=key)
[docs] def validation_data(self, x=None, y=None, data='validation', **kwargs):
self._maybe_dh_not_set(data=data)
return self.fetch_data(x=x, y=y, source='validation', data=data, **kwargs)
[docs] def test_data(self, x=None, y=None, data='test', **kwargs):
self._maybe_dh_not_set(data=data)
return self.fetch_data(x=x, y=y, source='test', data=data, **kwargs)
def _maybe_dh_not_set(self, data):
"""if dh_ has not been set yet, try to create it using data argument if
possible"""
if isinstance(data, str) and data not in ['training', 'test', 'validation']:
self.dh_ = DataSet(data=data, **self.data_config)
elif not isinstance(data, str):
self.dh_ = DataSet(data=data, **self.data_config)
return
[docs] def interpret(
self,
data=None,
data_type='training',
**kwargs):
return self.plot_act_along_inputs(
data=data,
layer_name=f'attn_weight_{self.lookback - 1}_1',
data_type=data_type,
**kwargs)
[docs] def get_attention_weights(
self,
layer_name: str=None,
x = None,
data = None,
data_type = 'training',
)->np.ndarray:
"""
Parameters
----------
layer_name : str, optional
the name of attention layer. If not given, the final attention
layer will be used.
x : optional
input data, if given, then ``data`` must not be given
data :
data_type : str, optional
the data to make forward pass to get attention weghts. Possible
values are
- ``training``
- ``validation``
- ``test``
- ``all``
Returns
-------
a numpy array of shape (num_examples, lookback, num_ins)
"""
if x is not None:
# default value
assert data_type in ("training", "test", "validation", "all")
layer_name = layer_name or f'attn_weight_{self.lookback - 1}_1'
assert isinstance(layer_name, str), f"""
layer_name must be a string, not of {layer_name.__class__.__name__} type
"""
from ai4water.postprocessing.visualize import Visualize
kwargs = {}
if self.config['drop_remainder']:
kwargs['batch_size'] = self.config['batch_size']
activation = Visualize(model=self).get_activations(
layer_names=layer_name,
x=x,
data=data,
data_type=data_type,
**kwargs)
activation = activation[layer_name] # (num_examples, lookback, num_ins)
return activation
[docs] def plot_act_along_lookback(self, activations, sample=0):
assert isinstance(activations, np.ndarray)
activation = activations[sample, :, :]
act_t = activation.transpose()
fig, axis = plt.subplots()
for idx, _name in enumerate(self.input_features):
axis.plot(act_t[idx, :], label=_name)
axis.set_xlabel('Lookback')
axis.set_ylabel('Input attention weight')
axis.legend(loc="best")
plt.show()
return
def _fit_transform_x(self, x):
"""transforms x and puts the transformer in config witht he key name"""
feature_names = [
self.input_features,
[f"{i}" for i in range(self.enc_config['n_s'])],
[f"{i}" for i in range(self.enc_config['n_h'])],
[f"{i}" for i in range(self.dec_config['n_hde0'])],
[f"{i}" for i in range(self.dec_config['n_sde0'])],
]
transformation = [self.config['x_transformation'], None, None, None, None]
if self.teacher_forcing:
feature_names.insert(1, self.output_features)
transformation.insert(1, self.config['y_transformation'])
return self._fit_transform(x, 'x_transformer_', transformation, feature_names)
def _fetch_data(self, source:str, x=None, y=None, data=None):
"""The main idea is that the user should be able to fully customize
training/test data by overwriting training_data and test_data methods.
However, if x is given or data is DataSet then the training_data/test_data
methods of this(Model) class will not be called."""
x, y, prefix, key, user_defined_x = super()._fetch_data(source, x, y, data)
if isinstance(x, np.ndarray):
if not self.config['drop_remainder']:
n_s_feature_dim = self.config['enc_config']['n_s']
n_h_feature_dim = self.config['enc_config']['n_h']
s0 = np.zeros((x.shape[0], n_s_feature_dim))
h0 = np.zeros((x.shape[0], n_h_feature_dim))
if self.__class__.__name__ == "DualAttentionModel":
p_feature_dim = self.dec_config['p']
h_de0 = s_de0 = np.zeros((x.shape[0], p_feature_dim))
x = [x, s0, h0, h_de0, s_de0]
else:
x = [x, s0, h0]
return x, y, prefix, key, user_defined_x
class InputAttentionModel(DualAttentionModel):
"""
InputAttentionModel is same as DualAttentionModel with output attention/decoder part
removed.
Example:
>>> from ai4water import InputAttentionModel
>>> from ai4water.datasets import busan_beach
>>> model = InputAttentionModel(
... input_features=busan_beach().columns.tolist()[0:-1],
... output_features=busan_beach().columns.tolist()[-1:])
>>> model.fit(data=busan_beach())
"""
def __init__(self, *args, teacher_forcing=False, **kwargs):
super(InputAttentionModel, self).__init__(*args, teacher_forcing=teacher_forcing, **kwargs)
def build(self, input_shape=None):
self.config['enc_config'] = self.enc_config
setattr(self, 'batch_size', self.config['batch_size'])
setattr(self, 'drop_remainder', self.config['drop_remainder'])
setattr(self, 'method', 'input_attention')
print('building input attention model')
enc_input = keras.layers.Input(shape=(self.lookback, self.num_ins), name='enc_input1')
lstm_out, h0, s0 = self._encoder(enc_input, self.enc_config, lstm2_seq=False)
act_out = layers.LeakyReLU()(lstm_out)
predictions = layers.Dense(self.num_outs)(act_out)
if self.forecast_len>1:
predictions = layers.Reshape(target_shape=(self.num_outs, self.forecast_len))(predictions)
if self.verbosity > 2:
print('predictions: ', predictions)
inputs = [enc_input]
if not self.drop_remainder:
inputs = inputs + [s0, h0]
self._model = self.compile(model_inputs=inputs, outputs=predictions)
return
def fetch_data(self, source, x=None, y=None, data=None, **kwargs):
if x is None:
if isinstance(data, str):
if data in ("training", "test", "validation"):
if hasattr(self, 'dh_'):
data = getattr(self.dh_, f'{data}_data')(**kwargs)
else:
raise DataNotFound(source)
else:
raise ValueError
else:
dh = DataSet(data=data, **self.data_config)
setattr(self, 'dh_', dh)
data = getattr(dh, f'{source}_data')(**kwargs)
else:
data = x, y
if self.teacher_forcing:
x, prev_y, labels = data
else:
x, labels = data
n_s_feature_dim = self.config['enc_config']['n_s']
n_h_feature_dim = self.config['enc_config']['n_h']
if kwargs.get('use_datetime_index', False): # during deindexification, first feature will be removed.
n_s_feature_dim += 1
n_h_feature_dim += 1
idx = np.expand_dims(x[:, 1:, 0], axis=-1) # extract the index from x
if self.teacher_forcing:
prev_y = np.concatenate([prev_y, idx], axis=2) # insert index in prev_y
if not self.config['drop_remainder']:
s0 = np.zeros((x.shape[0], n_s_feature_dim))
h0 = np.zeros((x.shape[0], n_h_feature_dim))
x = [x, s0, h0]
if self.verbosity > 0:
print_something(x, "input_x")
print_something(labels, "target")
if self.teacher_forcing:
return [x, prev_y], labels
else:
return x, labels
def _fit_transform_x(self, x):
"""transforms x and puts the transformer in config witht he key name
for conformity we need to add feature names of initial states and their transformations
will always be None.
"""
# x can be array when the user does not provide input conditions!
if isinstance(x, list):
assert len(x) == 3
feature_names = [
self.input_features,
[f"{i}" for i in range(self.enc_config['n_s'])],
[f"{i}" for i in range(self.enc_config['n_h'])]
]
transformation = [self.config['x_transformation'], None, None]
return self._fit_transform(x, 'x_transformer_', transformation, feature_names)
else:
transformation = self.config['x_transformation']
return self._fit_transform(x, 'x_transformer_', transformation,
self.input_features)