Source code for ai4water.datasets.camels

import json
import glob
from typing import Union, List

from ._datasets import Datasets
from .utils import check_attributes, download, sanity_check, _unzip
from ai4water.utils.utils import dateandtime_now
from ai4water.backend import os, random, np, pd, xr

try:  # shapely may not be installed, as it may be difficult to isntall and is only needed for plotting data.
    from ai4water.preprocessing.spatial_utils import plot_shapefile
except ModuleNotFoundError:
    plot_shapefile = None

# directory separator
SEP = os.sep

def gb_message():
    link = ""
    raise ValueError(f"Dwonlaoad the data from {link} and provide the directory "
                     f"path as dataset=Camels(data=data)")

[docs]class Camels(Datasets): """ Get CAMELS dataset. This class first downloads the CAMELS dataset if it is not already downloaded. Then the selected attribute for a selected id are fetched and provided to the user using the method `fetch`. Attributes ----------- - ds_dir str/path: diretory of the dataset - dynamic_features list: tells which dynamic attributes are available in this dataset - static_features list: a list of static attributes. - static_attribute_categories list: tells which kinds of static attributes are present in this category. Methods --------- - stations : returns name/id of stations for which the data (dynamic attributes) exists as list of strings. - fetch : fetches all attributes (both static and dynamic type) of all station/gauge_ids or a speficified station. It can also be used to fetch all attributes of a number of stations ids either by providing their guage_id or by just saying that we need data of 20 stations which will then be chosen randomly. - fetch_dynamic_features : fetches speficied dynamic attributes of one specified station. If the dynamic attribute is not specified, all dynamic attributes will be fetched for the specified station. If station is not specified, the specified dynamic attributes will be fetched for all stations. - fetch_static_features : works same as `fetch_dynamic_features` but for `static` attributes. Here if the `category` is not specified then static attributes of the specified station for all categories are returned. stations : returns list of stations """ DATASETS = { 'CAMELS-BR': {'url': "", }, 'CAMELS-GB': {'url': gb_message}, }
[docs] def __init__(self, path=None, **kwargs): super(Camels, self).__init__(path=path, **kwargs) self.ds_dir = path
[docs] def stations(self): raise NotImplementedError
def _read_dynamic_from_csv(self, stations, dynamic_features, st=None, en=None)->dict: raise NotImplementedError
[docs] def fetch_static_features( self, stn_id: Union[str, list], features: Union[str, list] = None ): """Fetches all or selected static attributes of one or more stations. Parameters ---------- stn_id : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Examples -------- >>> from ai4water.datasets import CAMELS_AUS >>> camels = CAMELS_AUS() >>> camels.fetch_static_features('224214A') >>> camels.static_features >>> camels.fetch_static_features('224214A', ... features=['elev_mean', 'relief', 'ksat', 'pop_mean']) """ raise NotImplementedError
@property def start(self): # start of data raise NotImplementedError @property def end(self): # end of data raise NotImplementedError @property def dynamic_features(self) -> list: raise NotImplementedError def _check_length(self, st, en): if st is None: st = self.start if en is None: en = self.end return st, en
[docs] def to_ts(self, static, st, en, as_ts=False, freq='D'): st, en = self._check_length(st, en) if as_ts: idx = pd.date_range(st, en, freq=freq) static = pd.DataFrame(np.repeat(static.values, len(idx), axis=0), index=idx, columns=static.columns) return static else: return static
@property def camels_dir(self): """Directory where all camels datasets will be saved. This will under datasets directory""" return os.path.join(self.base_ds_dir, "CAMELS") @property def ds_dir(self): """Directory where a particular dataset will be saved. """ return self._ds_dir @ds_dir.setter def ds_dir(self, x): if x is None: x = os.path.join(self.camels_dir, self.__class__.__name__) if not os.path.exists(x): os.makedirs(x) else: assert os.path.exists(x), f"No data exist at {x}" # sanity_check(, x) self._ds_dir = x
[docs] def fetch(self, stations: Union[str, list, int, float, None] = None, dynamic_features: Union[list, str, None] = 'all', static_features: Union[str, list, None] = None, st: Union[None, str] = None, en: Union[None, str] = None, as_dataframe: bool = False, **kwargs ) -> Union[dict, pd.DataFrame]: """ Fetches the attributes of one or more stations. Arguments: stations : if string, it is supposed to be a station name/gauge_id. If list, it will be a list of station/gauge_ids. If int, it will be supposed that the user want data for this number of stations/gauge_ids. If None (default), then attributes of all available stations. If float, it will be supposed that the user wants data of this fraction of stations. dynamic_features : If not None, then it is the attributes to be fetched. If None, then all available attributes are fetched static_features : list of static attributes to be fetches. None means no static attribute will be fetched. st : starting date of data to be returned. If None, the data will be returned from where it is available. en : end date of data to be returned. If None, then the data will be returned till the date data is available. as_dataframe : whether to return dynamic attributes as pandas dataframe or as xarray dataset. kwargs : keyword arguments to read the files returns: If both static and dynamic features are obtained then it returns a dictionary whose keys are station/gauge_ids and values are the attributes and dataframes. Otherwise either dynamic or static features are returned. Examples -------- >>> dataset = CAMELS_AUS() >>> # get data of 10% of stations >>> df = dataset.fetch(stations=0.1, as_dataframe=True) # returns a multiindex dataframe ... # fetch data of 5 (randomly selected) stations >>> df = dataset.fetch(stations=5, as_dataframe=True) ... # fetch data of 3 selected stations >>> df = dataset.fetch(stations=['912101A','912105A','915011A'], as_dataframe=True) ... # fetch data of a single stations >>> df = dataset.fetch(stations='318076', as_dataframe=True) ... # get both static and dynamic features as dictionary >>> data = dataset.fetch(1, static_features="all", as_dataframe=True) # -> dict >>> data['dynamic'] ... # get only selected dynamic features >>> df = dataset.fetch(stations='318076', ... dynamic_features=['streamflow_MLd', 'solarrad_AWAP'], as_dataframe=True) ... # fetch data between selected periods >>> df = dataset.fetch(stations='318076', st="20010101", en="20101231", as_dataframe=True) """ if isinstance(stations, int): # the user has asked to randomly provide data for some specified number of stations stations = random.sample(self.stations(), stations) elif isinstance(stations, list): pass elif isinstance(stations, str): stations = [stations] elif isinstance(stations, float): num_stations = int(len(self.stations()) * stations) stations = random.sample(self.stations(), num_stations) elif stations is None: # fetch for all stations stations = self.stations() else: raise TypeError(f"Unknown value provided for stations {stations}") if xr is None: raise ModuleNotFoundError("modeule xarray must be installed to use `datasets` module") return self.fetch_stations_attributes( stations, dynamic_features, static_features, st=st, en=en, as_dataframe=as_dataframe, **kwargs )
def _maybe_to_netcdf(self, fname: str): self.dyn_fname = os.path.join(self.ds_dir, f'{fname}.nc') if not os.path.exists(self.dyn_fname): # saving all the data in netCDF file using xarray print(f'converting data to netcdf format for faster io operations') data = self.fetch(static_features=None) data_vars = {} coords = {} for k, v in data.items(): data_vars[k] = (['time', 'dynamic_features'], v) index = v.index = 'time' coords = { 'dynamic_features': list(v.columns), 'time': index } xds = xr.Dataset( data_vars=data_vars, coords=coords, attrs={'date': f"create on {dateandtime_now()}"} ) xds.to_netcdf(self.dyn_fname)
[docs] def fetch_stations_attributes( self, stations: list, dynamic_features='all', static_features=None, st=None, en=None, as_dataframe: bool = False, **kwargs ): """Reads attributes of more than one stations. Arguments: stations : list of stations for which data is to be fetched. dynamic_features : list of dynamic attributes to be fetched. if 'all', then all dynamic attributes will be fetched. static_features : list of static attributes to be fetched. If `all`, then all static attributes will be fetched. If None, then no static attribute will be fetched. st : start of data to be fetched. en : end of data to be fetched. as_dataframe : whether to return the data as pandas dataframe. default is xr.dataset object kwargs dict: additional keyword arguments Returns: Dynamic and static features of multiple stations. Dynamic features are by default returned as xr.Dataset unless `as_dataframe` is True, in such a case, it is a pandas dataframe with multiindex. If xr.Dataset, it consists of `data_vars` equal to number of stations and for each station, the `DataArray` is of dimensions (time, dynamic_features). where `time` is defined by `st` and `en` i.e length of `DataArray`. In case, when the returned object is pandas DataFrame, the first index is `time` and second index is `dyanamic_features`. Static attributes are always returned as pandas DataFrame and have following shape `(stations, static_features). If `dynamic_features` is None, then they are not returned and the returned value only consists of static features. Same holds true for `static_features`. If both are not None, then the returned type is a dictionary with `static` and `dynamic` keys. Raises: ValueError, if both dynamic_features and static_features are None Examples -------- >>> from ai4water.datasets import CAMELS_AUS >>> dataset = CAMELS_AUS() ... # find out station ids >>> dataset.stations() ... # get data of selected stations >>> dataset.fetch_stations_attributes(['912101A', '912105A', '915011A'], ... as_dataframe=True) """ st, en = self._check_length(st, en) if dynamic_features is not None: dynamic_features = check_attributes(dynamic_features, self.dynamic_features) if not os.path.exists(self.dyn_fname): # read from csv files # following code will run only once when fetch is called inside init method dyn = self._read_dynamic_from_csv(stations, dynamic_features, st=st, en=en) else: dyn = xr.load_dataset(self.dyn_fname) # daataset dyn = dyn[stations].sel(dynamic_features=dynamic_features, time=slice(st, en)) if as_dataframe: dyn = dyn.to_dataframe(['time', 'dynamic_features']) if static_features is not None: static = self.fetch_static_features(stations, static_features) stns = {'dynamic': dyn, 'static': static} else: stns = dyn elif static_features is not None: return self.fetch_static_features(stations, static_features) else: raise ValueError return stns
[docs] def fetch_dynamic_features( self, stn_id: str, features='all', st=None, en=None, as_dataframe=False ): """Fetches all or selected dynamic attributes of one station. Parameters ---------- stn_id : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available dynamic features are returned. st : Optional (default=None) start time from where to fetch the data. en : Optional (default=None) end time untill where to fetch the data as_dataframe : bool, optional (default=False) if true, the returned data is pandas DataFrame otherwise it is xarray dataset Examples -------- >>> from ai4water.datasets import CAMELS_AUS >>> camels = CAMELS_AUS() >>> camels.fetch_dynamic_features('224214A', as_dataframe=True).unstack() >>> camels.dynamic_features >>> camels.fetch_dynamic_features('224214A', ... attributes=['tmax_AWAP', 'vprp_AWAP', 'streamflow_mmd'], ... as_dataframe=True).unstack() """ assert isinstance(stn_id, str), f"station id must be string is is of type {type(stn_id)}" station = [stn_id] return self.fetch_stations_attributes( station, features, None, st=st, en=en, as_dataframe=as_dataframe )
[docs] def fetch_station_attributes( self, station: str, dynamic_features: Union[str, list, None] = 'all', static_features: Union[str, list, None] = None, as_ts: bool = False, st: Union[str, None] = None, en: Union[str, None] = None, **kwargs ) -> pd.DataFrame: """ Fetches attributes for one station. Parameters ----------- station : station id/gauge id for which the data is to be fetched. dynamic_features : str/list, optional names of dynamic features/attributes to fetch static_features : names of static features/attributes to be fetches as_ts : bool whether static attributes are to be converted into a time series or not. If yes then the returned time series will be of same length as that of dynamic attribtues. st : str,optional starting point from which the data to be fetched. By default the data will be fetched from where it is available. en : str, optional end point of data to be fetched. By default the dat will be fetched Returns ------- pd.DataFrame dataframe if as_ts is True else it returns a dictionary of static and dynamic attributes for a station/gauge_id Examples -------- >>> from ai4water.datasets import CAMELS_AUS >>> dataset = CAMELS_AUS() >>> dataset.fetch_station_attributes('912101A') """ st, en = self._check_length(st, en) station_df = pd.DataFrame() if dynamic_features: dynamic = self.fetch_dynamic_features(station, dynamic_features, st=st, en=en, **kwargs) station_df = pd.concat([station_df, dynamic]) if static_features is not None: static = self.fetch_static_features(station, static_features) if as_ts: station_df = pd.concat([station_df, static], axis=1) else: station_df = {'dynamic': station_df, 'static': static} elif static_features is not None: station_df = self.fetch_static_features(station, static_features) return station_df
[docs]class LamaH(Camels): """ Large-Sample Data for Hydrology and Environmental Sciences for Central Europe from Zenodo_ following the work of Klingler_ et al., 2021 . .. _Zenodo: .. _Klingler: """ url = "" _data_types = ['total_upstrm', 'diff_upstrm_all', 'diff_upstrm_lowimp'] time_steps = ['daily', 'hourly'] static_attribute_categories = ['']
[docs] def __init__(self, *, time_step: str, data_type: str, **kwargs ): """ Parameters ---------- time_step : possible values are ``daily`` or ``hourly`` data_type : possible values are ``total_upstrm``, ``diff_upstrm_all`` or `diff_upstrm_lowimp` Examples -------- >>> from ai4water.datasets import LamaH >>> dataset = LamaH(time_step='daily', data_type='total_upstrm') >>> df = dataset.fetch(3, as_dataframe=True) """ assert time_step in self.time_steps, f"invalid time_step {time_step} given" assert data_type in self._data_types, f"invalid data_type {data_type} given." self.time_step = time_step self.data_type = data_type super().__init__(**kwargs) fpath = os.path.join(self.ds_dir, '') _data_types = self._data_types if self.time_step == 'daily' else ['total_upstrm'] if not os.path.exists(fpath): for dt in _data_types: for ts in self.time_steps: self.time_step = ts self.data_type = dt fname = f"lamah_{dt}_{ts}_dyn" self._maybe_to_netcdf(fname) self.time_step = time_step self.data_type = data_type self.dyn_fname = os.path.join(self.ds_dir, f'lamah_{data_type}_{time_step}')
@property def dynamic_features(self): station = self.stations()[0] df = self.read_ts_of_station(station) return df.columns.to_list() @property def static_features(self) -> list: fname = os.path.join(self.data_type_dir, f'1_attributes{SEP}Catchment_attributes.csv') df = pd.read_csv(fname, sep=';', index_col='ID') return df.columns.to_list() @property def data_type_dir(self): directory = 'CAMELS_AT' if self.time_step == 'hourly': directory = 'CAMELS_AT1' # todo, use it only for hourly, daily is causing errors # self.ds_dir/CAMELS_AT/data_type_dir f = [f for f in os.listdir(os.path.join(self.ds_dir, directory)) if self.data_type in f][0] return os.path.join(self.ds_dir, f'{directory}{SEP}{f}')
[docs] def stations(self) -> list: # assuming file_names of the format ID_{stn_id}.csv _dirs = os.listdir(os.path.join(self.data_type_dir, f'2_timeseries{SEP}{self.time_step}')) s = [f.split('_')[1].split('.csv')[0] for f in _dirs] return s
def _read_dynamic_from_csv(self, stations, dynamic_features: Union[str, list] = 'all', st=None, en=None, ): """Reads attributes of one station""" stations_attributes = {} for station in stations: station_df = pd.DataFrame() if dynamic_features is not None: dynamic_df = self.read_ts_of_station(station) station_df = pd.concat([station_df, dynamic_df]) stations_attributes[station] = station_df return stations_attributes
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]], features:Union[str, List[str]]=None ) -> pd.DataFrame: """ static features of LamaH Parameters ---------- stn_id : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Examples -------- >>> from ai4water.datasets import LamaH >>> dataset = LamaH(time_step='daily', data_type='total_upstrm') >>> df = dataset.fetch_static_features('99') # (1, 61) ... # get list of all static features >>> dataset.static_features >>> dataset.fetch_static_features('99', >>> features=['area_calc', 'elev_mean', 'agr_fra', 'sand_fra']) # (1, 4) """ fname = os.path.join(self.data_type_dir, f'1_attributes{SEP}Catchment_attributes.csv') df = pd.read_csv(fname, sep=';', index_col='ID') # if features is not None: static_features = check_attributes(features, self.static_features) df = df[static_features] if isinstance(stn_id, list): stations = [str(i) for i in stn_id] elif isinstance(stn_id, int): stations = str(stn_id) else: stations = stn_id df.index = df.index.astype(str) df = df.loc[stations] if isinstance(df, pd.Series): df = pd.DataFrame(df).transpose() return df
[docs] def read_ts_of_station(self, station) -> pd.DataFrame: # read a file containing timeseries data for one station fname = os.path.join(self.data_type_dir, f'2_timeseries{SEP}{self.time_step}{SEP}ID_{station}.csv') df = pd.read_csv(fname, sep=';') if self.time_step == 'daily': periods = pd.PeriodIndex(year=df["YYYY"], month=df["MM"], day=df["DD"], freq="D") df.index = periods.to_timestamp() else: periods = pd.PeriodIndex(year=df["YYYY"], month=df["MM"], day=df["DD"], hour=df["hh"], minute=df["mm"], freq="H") df.index = periods.to_timestamp() # remove the cols specifying index [df.pop(item) for item in ['YYYY', 'MM', 'DD', 'hh', 'mm'] if item in df] return df
@property def start(self): return "19810101" @property def end(self): return "20191231"
[docs]class HYSETS(Camels): """ database for hydrometeorological modeling of 14,425 North American watersheds from 1950-2018 following the work of `Arsenault et al., 2020 <>`_ The user must manually download the files, unpack them and provide the `path` where these files are saved. This data comes with multiple sources. Each source having one or more dynamic_features Following data_source are available. +---------------+------------------------------+ |sources | dynamic_features | |===============|==============================| |SNODAS_SWE | dscharge, swe | +---------------+------------------------------+ |SCDNA | discharge, pr, tasmin, tasmax| +---------------+------------------------------+ |nonQC_stations | discharge, pr, tasmin, tasmax| +---------------+------------------------------+ |Livneh | discharge, pr, tasmin, tasmax| +---------------+------------------------------+ |ERA5 | discharge, pr, tasmax, tasmin| +---------------+------------------------------+ |ERAS5Land_SWE | discharge, swe | +---------------+------------------------------+ |ERA5Land | discharge, pr, tasmax, tasmin| +---------------+------------------------------+ all sources contain one or more following dynamic_features with following shapes +----------------------------+------------------+ |dynamic_features | shape | |============================|==================| |time | (25202,) | +----------------------------+------------------+ |watershedID | (14425,) | +----------------------------+------------------+ |drainage_area | (14425,) | +----------------------------+------------------+ |drainage_area_GSIM | (14425,) | +----------------------------+------------------+ |flag_GSIM_boundaries | (14425,) | +----------------------------+------------------+ |flag_artificial_boundaries | (14425,) | +----------------------------+------------------+ |centroid_lat | (14425,) | +----------------------------+------------------+ |centroid_lon | (14425,) | +----------------------------+------------------+ |elevation | (14425,) | +----------------------------+------------------+ |slope | (14425,) | +----------------------------+------------------+ |discharge | (14425, 25202) | +----------------------------+------------------+ |pr | (14425, 25202) | +----------------------------+------------------+ |tasmax | (14425, 25202) | +----------------------------+------------------+ |tasmin | (14425, 25202) | +----------------------------+------------------+ Examples -------- >>> from ai4water.datasets import HYSETS >>> dataset = HYSETS(path="path/to/HYSETS") ... # fetch data of a random station >>> df = dataset.fetch(1, as_dataframe=True) >>> df.shape (25202, 5) >>> stations = dataset.stations() >>> len(stations) 14425 >>> df = dataset.fetch('999', as_dataframe=True) >>> df.unstack().shape (25202, 5) """ doi = "" url = "" Q_SRC = ['ERA5', 'ERA5Land', 'ERA5Land_SWE', 'Livneh', 'nonQC_stations', 'SCDNA', 'SNODAS_SWE'] SWE_SRC = ['ERA5Land_SWE', 'SNODAS_SWE'] OTHER_SRC = [src for src in Q_SRC if src not in ['ERA5Land_SWE', 'SNODAS_SWE']] dynamic_features = ['discharge', 'swe', 'tasmin', 'tasmax', 'pr']
[docs] def __init__(self, path: str, swe_source: str = "SNODAS_SWE", discharge_source: str = "ERA5", tasmin_source: str = "ERA5", tasmax_source: str = "ERA5", pr_source: str = "ERA5", **kwargs ): """ Arguments: path : path where all the data files are saved. swe_source : source of swe data. discharge_source : source of discharge data tasmin_source : source of tasmin data tasmax_source : source of tasmax data pr_source : source of pr data kwargs : arguments for `Camels` base class """ assert swe_source in self.SWE_SRC, f'source must be one of {self.SWE_SRC}' assert discharge_source in self.Q_SRC, f'source must be one of {self.Q_SRC}' assert tasmin_source in self.OTHER_SRC, f'source must be one of {self.OTHER_SRC}' assert tasmax_source in self.OTHER_SRC, f'source must be one of {self.OTHER_SRC}' assert pr_source in self.OTHER_SRC, f'source must be one of {self.OTHER_SRC}' self.sources = { 'swe': swe_source, 'discharge': discharge_source, 'tasmin': tasmin_source, 'tasmax': tasmax_source, 'pr': pr_source } super().__init__(**kwargs) self.ds_dir = path fpath = os.path.join(self.ds_dir, '') if not os.path.exists(fpath): self._maybe_to_netcdf('hysets_dyn')
def _maybe_to_netcdf(self, fname: str): # todo saving as one file takes very long time oneD_vars = [] twoD_vars = [] for src in self.Q_SRC: xds = xr.open_dataset(os.path.join(self.ds_dir, f'HYSETS_2020_{src}.nc')) for var in xds.variables: print(f'getting {var} from source {src} ') if len(xds[var].data.shape) > 1: xar = xds[var] = f"{}_{src}" twoD_vars.append(xar) else: xar = xds[var] = f"{}_{src}" oneD_vars.append(xar) oneD_xds = xr.merge(oneD_vars) twoD_xds = xr.merge(twoD_vars) oneD_xds.to_netcdf(os.path.join(self.ds_dir, "")) twoD_xds.to_netcdf(os.path.join(self.ds_dir, "")) return @property def ds_dir(self): return self._ds_dir @ds_dir.setter def ds_dir(self, x): sanity_check('HYSETS', x) self._ds_dir = x @property def static_features(self)->list: df = self.read_static_data() return df.columns.to_list()
[docs] def stations(self) -> List[str]: """ Returns ------- list a list of ids of stations Examples -------- >>> dataset = HYSETS() ... # get name of all stations as list >>> dataset.stations() """ return self.read_static_data().index.to_list()
@property def start(self)->str: return "19500101" @property def end(self)->str: return "20181231"
[docs] def fetch_stations_attributes( self, stations: list, dynamic_features: Union[str, list, None] = 'all', static_features: Union[str, list, None] = None, st=None, en=None, as_dataframe: bool = False, **kwargs ): """returns attributes of multiple stations Examples -------- >>> from ai4water.datasets import HYSETS >>> dataset = HYSETS() >>> stations = dataset.stations()[0:3] >>> attributes = dataset.fetch_stations_attributes(stations) """ stations = check_attributes(stations, self.stations()) stations = [int(stn) for stn in stations] if dynamic_features is not None: dyn = self._fetch_dynamic_features(stations=stations, dynamic_features=dynamic_features, as_dataframe=as_dataframe, st=st, en=en, **kwargs ) if static_features is not None: # we want both static and dynamic to_return = {} static = self._fetch_static_features(station=stations, static_features=static_features, st=st, en=en, **kwargs ) to_return['static'] = static to_return['dynamic'] = dyn else: to_return = dyn elif static_features is not None: # we want only static to_return = self._fetch_static_features( station=stations, static_features=static_features, **kwargs ) else: raise ValueError return to_return
[docs] def fetch_dynamic_features( self, stn_id, features='all', st=None, en=None, as_dataframe=False ): """Fetches dynamic attributes of one station. Examples -------- >>> from ai4water.datasets import HYSETS >>> dataset = HYSETS() >>> dyn_features = dataset.fetch_dynamic_features('station_name') """ station = [int(stn_id)] return self._fetch_dynamic_features( stations=station, dynamic_features=features, st=st, en=en, as_dataframe=as_dataframe )
def _fetch_dynamic_features( self, stations: list, dynamic_features='all', st=None, en=None, as_dataframe=False, as_ts=False ): """Fetches dynamic attributes of station.""" st, en = self._check_length(st, en) attrs = check_attributes(dynamic_features, self.dynamic_features) stations = np.subtract(stations, 1).tolist() # maybe we don't need to read all variables sources = {k: v for k, v in self.sources.items() if k in attrs} # original .nc file contains datasets with dynamic and static features as data_vars # however, for uniformity of this API and easy usage, we want a Dataset to have # station names/gauge_ids as data_vars and each data_var has # dimension (time, dynamic_variables) # Therefore, first read all data for each station from .nc file # then rearrange it. # todo, this operation is slower because of `to_dataframe` # also doing this removes all the metadata x = {} f = os.path.join(self.ds_dir, "") xds = xr.open_dataset(f) for stn in stations: xds1 = xds[[f'{k}_{v}' for k, v in sources.items()]].sel(watershed=stn, time=slice(st, en)) xds1 = xds1.rename_vars({f'{k}_{v}': k for k, v in sources.items()}) x[stn] = xds1.to_dataframe(['time']) xds = xr.Dataset(x) xds = xds.rename_dims({'dim_1': 'dynamic_features'}) xds = xds.rename_vars({'dim_1': 'dynamic_features'}) if as_dataframe: return xds.to_dataframe(['time', 'dynamic_features']) return xds def _fetch_static_features( self, station, static_features: Union[str, list] = 'all', st=None, en=None, as_ts=False ): df = self.read_static_data() static_features = check_attributes(static_features, self.static_features) if isinstance(station, str): station = [station] elif isinstance(station, int): station = [str(station)] elif isinstance(station, list): station = [str(stn) for stn in station] else: raise ValueError return self.to_ts(df.loc[station][static_features], st=st, en=en, as_ts=as_ts)
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]], features:Union[str, List[str]]="all", st=None, en=None, as_ts=False ) -> pd.DataFrame: """returns static atttributes of one or multiple stations Parameters ---------- stn_id : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Examples --------- >>> from ai4water.datasets import HYSETS >>> dataset = HYSETS() get the names of stations >>> stns = dataset.stations() >>> len(stns) 14425 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (14425, 28) get static data of one station only >>> static_data = dataset.fetch_static_features('991') >>> static_data.shape (1, 28) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['Drainage_Area_km2', 'Elevation_m']) >>> static_data.shape (14425, 2) """ return self._fetch_static_features(stn_id, features, st, en, as_ts)
[docs] def read_static_data(self): fname = os.path.join(self.ds_dir, 'HYSETS_watershed_properties.txt') static_df = pd.read_csv(fname, index_col='Watershed_ID', sep=';') static_df.index = static_df.index.astype(str) return static_df
[docs]class CAMELS_US(Camels): """ Downloads and processes CAMELS dataset of 671 catchments named as CAMELS from following Newman et al., 2015 [1]_ Examples -------- >>> from ai4water.datasets import CAMELS_US >>> dataset = CAMELS_US(path=r'F:\data\CAMELS\CAMELS_US') >>> df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (12784, 8) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 671 # get data by station id >>> df = dataset.fetch(stations='11478500', as_dataframe=True).unstack() >>> df.shape (12784, 8) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['prcp(mm/day)', 'srad(W/m2)', 'tmax(C)', 'tmin(C)', 'Flow']).unstack() >>> df.shape (12784, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> df = dataset.fetch(10, as_dataframe=True) >>> df.shape (102272, 10) # remember this is multi-indexed DataFrame .. [1]_ """ DATASETS = ['CAMELS_US'] url = "" catchment_attr_url = "" folders = {'basin_mean_daymet': f'basin_mean_forcing{SEP}daymet', 'basin_mean_maurer': f'basin_mean_forcing{SEP}maurer', 'basin_mean_nldas': f'basin_mean_forcing{SEP}nldas', 'basin_mean_v1p15_daymet': f'basin_mean_forcing{SEP}v1p15{SEP}daymet', 'basin_mean_v1p15_nldas': f'basin_mean_forcing{SEP}v1p15{SEP}nldas', 'elev_bands': f'elev{SEP}daymet', 'hru': f'hru_forcing{SEP}daymet'} dynamic_features = ['dayl(s)', 'prcp(mm/day)', 'srad(W/m2)', 'swe(mm)', 'tmax(C)', 'tmin(C)', 'vp(Pa)', 'Flow']
[docs] def __init__(self, data_source='basin_mean_daymet', path=None): assert data_source in self.folders, f'allwed data sources are {self.folders.keys()}' self.data_source = data_source super().__init__(path=path, name="CAMELS_US") self.ds_dir = path if os.path.exists(self.ds_dir): print(f"dataset is already downloaded at {self.ds_dir}") else: download(self.url, os.path.join(self.camels_dir, f'CAMELS_US{SEP}')) download(self.catchment_attr_url, os.path.join(self.camels_dir, f"CAMELS_US{SEP}")) _unzip(self.ds_dir) self.attr_dir = os.path.join(self.ds_dir, f'catchment_attrs{SEP}camels_attributes_v2.0') self.dataset_dir = os.path.join(self.ds_dir, f'CAMELS_US{SEP}basin_dataset_public_v1p2') self._maybe_to_netcdf('camels_us_dyn')
@property def start(self): return "19800101" @property def end(self): return "20141231" @property def static_features(self): static_fpath = os.path.join(self.ds_dir, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob(f"{os.path.join(self.ds_dir, 'catchment_attrs', 'camels_attributes_v2.0')}/*.txt") cols = [] for f in files: _df = pd.read_csv(f, sep=';', index_col='gauge_id', nrows=1) cols += list(_df.columns) else: df = pd.read_csv(static_fpath, index_col='gauge_id', nrows=1) cols = list(df.columns) return cols
[docs] def stations(self) -> list: stns = [] for _dir in os.listdir(os.path.join(self.dataset_dir, 'usgs_streamflow')): cat = os.path.join(self.dataset_dir, f'usgs_streamflow{SEP}{_dir}') stns += [fname.split('_')[0] for fname in os.listdir(cat)] # remove stations for which static values are not available for stn in ['06775500', '06846500', '09535100']: stns.remove(stn) return stns
def _read_dynamic_from_csv(self, stations, dynamic_features: Union[str, list] = 'all', st=None, en=None, ): dyn = {} for station in stations: # attributes = check_attributes(dynamic_features, self.dynamic_features) assert isinstance(station, str) df = None df1 = None dir_name = self.folders[self.data_source] for cat in os.listdir(os.path.join(self.dataset_dir, dir_name)): cat_dirs = os.listdir(os.path.join(self.dataset_dir, f'{dir_name}{SEP}{cat}')) stn_file = f'{station}_lump_cida_forcing_leap.txt' if stn_file in cat_dirs: df = pd.read_csv(os.path.join(self.dataset_dir, f'{dir_name}{SEP}{cat}{SEP}{stn_file}'), sep="\s+|;|:", skiprows=4, engine='python', names=['Year', 'Mnth', 'Day', 'Hr', 'dayl(s)', 'prcp(mm/day)', 'srad(W/m2)', 'swe(mm)', 'tmax(C)', 'tmin(C)', 'vp(Pa)'], ) df.index = pd.to_datetime( df['Year'].map(str) + '-' + df['Mnth'].map(str) + '-' + df['Day'].map(str)) flow_dir = os.path.join(self.dataset_dir, 'usgs_streamflow') for cat in os.listdir(flow_dir): cat_dirs = os.listdir(os.path.join(flow_dir, cat)) stn_file = f'{station}_streamflow_qc.txt' if stn_file in cat_dirs: fpath = os.path.join(flow_dir, f'{cat}{SEP}{stn_file}') df1 = pd.read_csv(fpath, sep="\s+|;|:'", names=['station', 'Year', 'Month', 'Day', 'Flow', 'Flag'], engine='python') df1.index = pd.to_datetime( df1['Year'].map(str) + '-' + df1['Month'].map(str) + '-' + df1['Day'].map(str)) out_df = pd.concat([df[['dayl(s)', 'prcp(mm/day)', 'srad(W/m2)', 'swe(mm)', 'tmax(C)', 'tmin(C)', 'vp(Pa)']], df1['Flow']], axis=1) dyn[station] = out_df return dyn
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]], features:Union[str, List[str]]=None ): """ gets one or more static features of one or more stations Parameters ---------- stn_id : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Examples -------- >>> from ai4water.datasets import CAMELS_US >>> camels = CAMELS_US() >>> st_data = camels.fetch_static_features('11532500') >>> st_data.shape (1, 59) get names of available static features >>> camels.static_features get specific features of one station >>> static_data = camels.fetch_static_features('11528700', >>> features=['area_gages2', 'geol_porostiy', 'soil_conductivity', 'elev_mean']) >>> static_data.shape (1, 4) get names of allstations >>> all_stns = camels.stations() >>> len(all_stns) 671 >>> all_static_data = camels.fetch_static_features(all_stns) >>> all_static_data.shape (671, 59) """ attributes = check_attributes(features, self.static_features) static_fpath = os.path.join(self.ds_dir, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob(f"{os.path.join(self.ds_dir, 'catchment_attrs', 'camels_attributes_v2.0')}/*.txt") static_df = pd.DataFrame() for f in files: # index should be read as string idx = pd.read_csv(f, sep=';', usecols=['gauge_id'], dtype=str) _df = pd.read_csv(f, sep=';', index_col='gauge_id') _df.index = idx['gauge_id'] static_df = pd.concat([static_df, _df], axis=1) static_df.to_csv(static_fpath, index_label='gauge_id') else: # index should be read as string bcs it has 0s at the start idx = pd.read_csv(static_fpath, usecols=['gauge_id'], dtype=str) static_df = pd.read_csv(static_fpath, index_col='gauge_id') static_df.index = idx['gauge_id'] static_df.index = static_df.index.astype(str) df = static_df.loc[stn_id][attributes] if isinstance(df, pd.Series): df = pd.DataFrame(df).transpose() return df
[docs]class CAMELS_BR(Camels): """ Downloads and processes CAMELS dataset of Brazil Examples -------- >>> from ai4water.datasets import CAMELS_BR >>> dataset = CAMELS_BR(path=r'F:\data\CAMELS\CAMELS_BR') >>> df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (14245, 12) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 593 # get data by station id >>> df = dataset.fetch(stations='46035000', as_dataframe=True).unstack() >>> df.shape (14245, 12) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['precipitation_cpc', 'evapotransp_mgb', 'temperature_mean', 'streamflow_m3s']).unstack() >>> df.shape (14245, 4) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> df = dataset.fetch(10, as_dataframe=True) >>> df.shape (170940, 10) # remember this is multi-indexed DataFrame """ url = "" folders = {'streamflow_m3s': '02_CAMELS_BR_streamflow_m3s', 'streamflow_mm': '03_CAMELS_BR_streamflow_mm_selected_catchments', 'simulated_streamflow_m3s': '04_CAMELS_BR_streamflow_simulated', 'precipitation_cpc': '07_CAMELS_BR_precipitation_cpc', 'precipitation_mswep': '06_CAMELS_BR_precipitation_mswep', 'precipitation_chirps': '05_CAMELS_BR_precipitation_chirps', 'evapotransp_gleam': '08_CAMELS_BR_evapotransp_gleam', 'evapotransp_mgb': '09_CAMELS_BR_evapotransp_mgb', 'potential_evapotransp_gleam': '10_CAMELS_BR_potential_evapotransp_gleam', 'temperature_min': '11_CAMELS_BR_temperature_min_cpc', 'temperature_mean': '12_CAMELS_BR_temperature_mean_cpc', 'temperature_max': '13_CAMELS_BR_temperature_max_cpc' }
[docs] def __init__(self, path=None): super().__init__(path=path, name="CAMELS-BR") self.ds_dir = path self._download() self._maybe_to_netcdf('camels_dyn_br')
@property def _all_dirs(self): """All the folders in the dataset_directory""" return [f for f in os.listdir(self.ds_dir) if os.path.isdir(os.path.join(self.ds_dir, f))] @property def static_dir(self): path = None for _dir in self._all_dirs: if "attributes" in _dir: # supposing that 'attributes' axist in only one file/folder in self.ds_dir path = os.path.join(self.ds_dir, f'{_dir}{SEP}{_dir}') return path @property def static_files(self): all_files = None if self.static_dir is not None: all_files = glob.glob(f"{self.static_dir}/*.txt") return all_files @property def dynamic_features(self) -> list: return list(CAMELS_BR.folders.keys()) @property def static_attribute_categories(self): static_attrs = [] for f in self.static_files: ff = str(os.path.basename(f).split('.txt')[0]) static_attrs.append('_'.join(ff.split('_')[2:])) return static_attrs @property def static_features(self): static_fpath = os.path.join(self.ds_dir, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob( f"{os.path.join(self.ds_dir, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes')}/*.txt") cols = [] for f in files: _df = pd.read_csv(f, sep=' ', index_col='gauge_id', nrows=1) cols += list(_df.columns) else: df = pd.read_csv(static_fpath, index_col='gauge_id', nrows=1) cols = list(df.columns) return cols @property def start(self): return "19800101" @property def end(self): return "20181231"
[docs] def all_stations(self, attribute) -> list: """Tells all station ids for which a data of a specific attribute is available.""" all_files = [] for _attr, _dir in self.folders.items(): if attribute in _attr: all_files = os.listdir(os.path.join(self.ds_dir, f'{_dir}{SEP}{_dir}')) stations = [] for f in all_files: stations.append(str(f.split('_')[0])) return stations
[docs] def stations(self, to_exclude=None) -> list: """Returns a list of station ids which are common among all dynamic attributes. Example ------- >>> dataset = CAMELS_BR() >>> stations = dataset.stations() """ if to_exclude is not None: if not isinstance(to_exclude, list): assert isinstance(to_exclude, str) to_exclude = [to_exclude] else: to_exclude = [] stations = {} for dyn_attr in self.dynamic_features: if dyn_attr not in to_exclude: stations[dyn_attr] = self.all_stations(dyn_attr) stns = list(set.intersection(*map(set, list(stations.values())))) return stns
def _read_dynamic_from_csv(self, stations, attributes: Union[str, list] = 'all', st=None, en=None, ): """ returns the dynamic/time series attribute/attributes for one station id. Example ------- >>> dataset = CAMELS_BR() >>> pcp = dataset.fetch_dynamic_features('10500000', 'precipitation_cpc') ... # fetch all time series data associated with a station. >>> x = dataset.fetch_dynamic_features('51560000', dataset.dynamic_features) """ attributes = check_attributes(attributes, self.dynamic_features) dyn = {} for stn_id in stations: # making one separate dataframe for one station data = pd.DataFrame() for attr, _dir in self.folders.items(): if attr in attributes: path = os.path.join(self.ds_dir, f'{_dir}{SEP}{_dir}') # supposing that the filename starts with stn_id and has .txt extension. fname = [f for f in os.listdir(path) if f.startswith(str(stn_id)) and f.endswith('.txt')] fname = fname[0] if os.path.exists(os.path.join(path, fname)): df = pd.read_csv(os.path.join(path, fname), sep=' ') df.index = pd.to_datetime(df[['year', 'month', 'day']]) df.index.freq = pd.infer_freq(df.index) df = df[st:en] # only read one column which matches the attr # todo, qual_flag maybe important [df.pop(item) for item in df.columns if item != attr] data = pd.concat([data, df], axis=1) else: raise FileNotFoundError(f"file {fname} not found at {path}") dyn[stn_id] = data return dyn
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]], features:Union[str, List[str]]=None ) -> pd.DataFrame: """ Parameters ---------- stn_id : int/list station id whose attribute to fetch features : str/list name of attribute to fetch. Default is None, which will return all the attributes for a particular station of the specified category. Example ------- >>> dataset = Camels('CAMELS-BR') >>> df = dataset.fetch_static_features('11500000', 'climate') # read all static features of all stations >>> data = dataset.fetch_static_features(dataset.stations(), dataset.static_features) >>> data.shape (597, 67) """ if isinstance(stn_id, int): station = [str(stn_id)] elif isinstance(stn_id, list): station = [str(stn) for stn in stn_id] elif isinstance(stn_id, str): station = [stn_id] else: raise ValueError attributes = check_attributes(features, self.static_features) static_fpath = os.path.join(self.ds_dir, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob( f"{os.path.join(self.ds_dir, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes')}/*.txt") static_df = pd.DataFrame() for f in files: _df = pd.read_csv(f, sep=' ', index_col='gauge_id') static_df = pd.concat([static_df, _df], axis=1) static_df.to_csv(static_fpath, index_label='gauge_id') else: static_df = pd.read_csv(static_fpath, index_col='gauge_id') static_df.index = static_df.index.astype(str) return pd.DataFrame(static_df.loc[station][attributes])
[docs]class CAMELS_GB(Camels): """ This dataset must be manually downloaded by the user. The path of the downloaded folder must be provided while initiating this class. """ dynamic_features = ["precipitation", "pet", "temperature", "discharge_spec", "discharge_vol", "peti", "humidity", "shortwave_rad", "longwave_rad", "windspeed"]
[docs] def __init__(self, path=None): super().__init__(name="CAMELS-GB", path=path) self._maybe_to_netcdf('camels_gb_dyn')
@property def ds_dir(self): """Directory where a particular dataset will be saved. """ return self._ds_dir @ds_dir.setter def ds_dir(self, x): sanity_check('CAMELS-GB', x) self._ds_dir = x @property def static_attribute_categories(self) -> list: attributes = [] path = os.path.join(self.ds_dir, 'data') for f in os.listdir(path): if os.path.isfile(os.path.join(path, f)) and f.endswith('csv'): attributes.append(f.split('_')[2]) return attributes @property def start(self): return "19701001" @property def end(self): return "20150930" @property def static_features(self): files = glob.glob(f"{os.path.join(self.ds_dir, 'data')}/*.csv") cols = [] for f in files: if 'static_features.csv' not in f: df = pd.read_csv(f, nrows=1, index_col='gauge_id') cols += (list(df.columns)) return cols
[docs] def stations(self, to_exclude=None): # CAMELS_GB_hydromet_timeseries_StationID_number path = os.path.join(self.ds_dir, f'data{SEP}timeseries') gauge_ids = [] for f in os.listdir(path): gauge_ids.append(f.split('_')[4]) return gauge_ids
def _read_dynamic_from_csv(self, stations, attributes: Union[str, list] = 'all', st=None, en=None, ): """Fetches dynamic attribute/attributes of one station.""" dyn = {} for stn_id in stations: # making one separate dataframe for one station path = os.path.join(self.ds_dir, f"data{SEP}timeseries") fname = None for f in os.listdir(path): if stn_id in f: fname = f break df = pd.read_csv(os.path.join(path, fname), index_col='date') df.index = pd.to_datetime(df.index) df.index.freq = pd.infer_freq(df.index) dyn[stn_id] = df return dyn
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]], features:Union[str, List[str]]="all" ) -> pd.DataFrame: """ Fetches static attributes of one or more stations for one or more category as dataframe. Parameters ---------- stn_id : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Examples --------- >>> from ai4water.datasets import CAMELS_GB >>> dataset = CAMELS_GB() get the names of stations >>> stns = dataset.stations() >>> len(stns) 671 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (671, 290) get static data of one station only >>> static_data = dataset.fetch_static_features('85004') >>> static_data.shape (1, 290) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['area', 'elev_mean']) >>> static_data.shape (671, 2) """ attributes = check_attributes(features, self.static_features) static_fname = 'static_features.csv' static_fpath = os.path.join(self.ds_dir, 'data', static_fname) if os.path.exists(static_fpath): static_df = pd.read_csv(static_fpath, index_col='gauge_id') else: files = glob.glob(f"{os.path.join(self.ds_dir, 'data')}/*.csv") static_df = pd.DataFrame() for f in files: _df = pd.read_csv(f, index_col='gauge_id') static_df = pd.concat([static_df, _df], axis=1) static_df.to_csv(static_fpath) if isinstance(stn_id, str): station = [stn_id] elif isinstance(stn_id, int): station = [str(stn_id)] elif isinstance(stn_id, list): station = [str(stn) for stn in stn_id] else: raise ValueError static_df.index = static_df.index.astype(str) return static_df.loc[station][attributes]
[docs]class CAMELS_AUS(Camels): """ Inherits from Camels class. Reads CAMELS-AUS dataset of `Fowler et al., 2020 <>`_ dataset. Examples -------- >>> from ai4water.datasets import CAMELS_AUS >>> dataset = CAMELS_AUS() >>> df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (21184, 26) ... # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 222 ... # get data by station id >>> df = dataset.fetch(stations='224214A', as_dataframe=True).unstack() >>> df.shape (21184, 26) ... # get names of available dynamic features >>> dataset.dynamic_features ... # get only selected dynamic features >>> data = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['tmax_AWAP', 'precipitation_AWAP', 'et_morton_actual_SILO', 'streamflow_MLd']).unstack() >>> data.shape (21184, 4) ... # get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> df = dataset.fetch(10, as_dataframe=True) >>> df.shape # remember this is a multiindexed dataframe (21184, 260) """ url = '' urls = { "": "", "": "", "": "", "": "", "": "", "CAMELS_AUS_Attributes-Indices_MasterTable.csv": "", "Units_01_TimeseriesData.pdf": "", "Units_02_AttributeMasterTable.pdf": "", } folders = { 'streamflow_MLd': f'03_streamflow{SEP}03_streamflow{SEP}streamflow_MLd', 'streamflow_MLd_inclInfilled': f'03_streamflow{SEP}03_streamflow{SEP}streamflow_MLd_inclInfilled', 'streamflow_mmd': f'03_streamflow{SEP}03_streamflow{SEP}streamflow_mmd', 'et_morton_actual_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries{SEP}et_morton_actual_SILO', 'et_morton_point_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries{SEP}et_morton_point_SILO', 'et_morton_wet_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries{SEP}et_morton_wet_SILO', 'et_short_crop_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries{SEP}et_short_crop_SILO', 'et_tall_crop_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries{SEP}et_tall_crop_SILO', 'evap_morton_lake_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries{SEP}evap_morton_lake_SILO', 'evap_pan_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries{SEP}evap_pan_SILO', 'evap_syn_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries{SEP}evap_syn_SILO', 'precipitation_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}01_precipitation_timeseries{SEP}precipitation_AWAP', 'precipitation_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}01_precipitation_timeseries{SEP}precipitation_SILO', 'precipitation_var_SWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}01_precipitation_timeseries{SEP}precipitation_var_AWAP', 'solarrad_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AWAP{SEP}solarrad_AWAP', 'tmax_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AWAP{SEP}tmax_AWAP', 'tmin_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AWAP{SEP}tmin_AWAP', 'vprp_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AWAP{SEP}vprp_AWAP', 'mslp_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO{SEP}mslp_SILO', 'radiation_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO{SEP}radiation_SILO', 'rh_tmax_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO{SEP}rh_tmax_SILO', 'rh_tmin_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO{SEP}rh_tmin_SILO', 'tmax_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO{SEP}tmax_SILO', 'tmin_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO{SEP}tmin_SILO', 'vp_deficit_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO{SEP}vp_deficit_SILO', 'vp_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO{SEP}vp_SILO', }
[docs] def __init__(self, path: str = None): """ Arguments: path: path where the CAMELS-AUS dataset has been downloaded. This path must contain five zip files and one xlsx file. If None, then the data will downloaded. """ if path is not None: assert isinstance(path, str), f'path must be string like but it is "{path}" of type {path.__class__.__name__}' if not os.path.exists(path) or len(os.listdir(path)) < 2: raise FileNotFoundError(f"The path {path} does not exist") self.ds_dir = path super().__init__(path=path) if not os.path.exists(self.ds_dir): os.makedirs(self.ds_dir) for _file, url in self.urls.items(): fpath = os.path.join(self.ds_dir, _file) if not os.path.exists(fpath): download(url + _file, fpath) _unzip(self.ds_dir) self._maybe_to_netcdf('camels_aus_dyn')
@property def start(self): return "19570101" @property def end(self): return "20181231" @property def location(self): return "Australia"
[docs] def stations(self, as_list=True) -> list: fname = os.path.join(self.ds_dir, f"01_id_name_metadata{SEP}01_id_name_metadata{SEP}id_name_metadata.csv") df = pd.read_csv(fname) if as_list: return df['station_id'].to_list() else: return df
@property def static_attribute_categories(self): attributes = [] path = os.path.join(self.ds_dir, f'04_attributes{SEP}04_attributes') for f in os.listdir(path): if os.path.isfile(os.path.join(path, f)) and f.endswith('csv'): f = str(f.split('.csv')[0]) attributes.append(''.join(f.split('_')[2:])) return attributes @property def static_features(self) -> list: static_fpath = os.path.join(self.ds_dir, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob(f"{os.path.join(self.ds_dir, '04_attributes', '04_attributes')}/*.csv") cols = [] for f in files: _df = pd.read_csv(f, index_col='station_id', nrows=1) cols += list(_df.columns) else: df = pd.read_csv(static_fpath, index_col='station_id', nrows=1) cols = list(df.columns) return cols @property def dynamic_features(self) -> list: return list(self.folders.keys()) def _read_static(self, stations, attributes, st=None, en=None): attributes = check_attributes(attributes, self.static_features) static_fname = 'static_features.csv' static_fpath = os.path.join(self.ds_dir, static_fname) if os.path.exists(static_fpath): static_df = pd.read_csv(static_fpath, index_col='station_id') else: files = glob.glob(f"{os.path.join(self.ds_dir, '04_attributes', '04_attributes')}/*.csv") static_df = pd.DataFrame() for f in files: _df = pd.read_csv(f, index_col='station_id') static_df = pd.concat([static_df, _df], axis=1) static_df.to_csv(static_fpath) static_df.index = static_df.index.astype(str) df = static_df.loc[stations][attributes] if isinstance(df, pd.Series): df = pd.DataFrame(df).transpose() return self.to_ts(df, st, en) def _read_dynamic_from_csv(self, stations, dynamic_features, **kwargs): dyn_attrs = {} dyn = {} for _attr in dynamic_features: _path = os.path.join(self.ds_dir, f'{self.folders[_attr]}.csv') _df = pd.read_csv(_path, na_values=['-99.99']) _df.index = pd.to_datetime(_df[['year', 'month', 'day']]) [_df.pop(col) for col in ['year', 'month', 'day']] dyn_attrs[_attr] = _df # making one separate dataframe for one station for stn in stations: stn_df = pd.DataFrame() for attr, attr_df in dyn_attrs.items(): if attr in dynamic_features: stn_df[attr] = attr_df[stn] dyn[stn] = stn_df return dyn
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]], features:Union[str, List[str]]="all", **kwargs ) -> pd.DataFrame: """Fetches static attribuets of one or more stations as dataframe. Parameters ---------- stn_id : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Examples --------- >>> from ai4water.datasets import CAMELS_AUS >>> dataset = CAMELS_AUS() get the names of stations >>> stns = dataset.stations() >>> len(stns) 222 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (222, 110) get static data of one station only >>> static_data = dataset.fetch_static_features('305202') >>> static_data.shape (1, 110) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['catchment_di', 'elev_mean']) >>> static_data.shape (222, 2) """ return self._read_static(stn_id, features)
[docs] def plot(self, what, stations=None, **kwargs): assert what in ['outlets', 'boundaries'] f1 = os.path.join(self.ds_dir, f'02_location_boundary_area{SEP}02_location_boundary_area{SEP}shp{SEP}CAMELS_AUS_BasinOutlets_adopted.shp') f2 = os.path.join(self.ds_dir, f'02_location_boundary_area{SEP}02_location_boundary_area{SEP}shp{SEP}bonus data{SEP}Australia_boundaries.shp') if plot_shapefile is not None: return plot_shapefile(f1, bbox_shp=f2, recs=stations, rec_idx=0, **kwargs) else: raise ModuleNotFoundError("Shapely must be installed in order to plot the datasets.")
[docs]class CAMELS_CL(Camels): """ Downloads and processes CAMELS dataset of Chile following the work of Alvarez-Garreton_ et al., 2018 . Examples --------- >>> from ai4water.datasets import CAMELS_CL >>> dataset = CAMELS_CL() >>> df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (38374, 12) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 516 # get data by station id >>> df = dataset.fetch(stations='11130001', as_dataframe=True).unstack() >>> df.shape (38374, 12) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['pet_hargreaves', 'precip_tmpa', 'tmean_cr2met', 'streamflow_m3s']).unstack() >>> df.shape (38374, 4) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> df = dataset.fetch(10, as_dataframe=True) >>> df.shape (460488, 10) .. _Alvarez-Garreton: """ urls = { "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "", } dynamic_features = ['streamflow_m3s', 'streamflow_mm', 'precip_cr2met', 'precip_chirps', 'precip_mswep', 'precip_tmpa', 'tmin_cr2met', 'tmax_cr2met', 'tmean_cr2met', 'pet_8d_modis', 'pet_hargreaves', 'swe' ]
[docs] def __init__(self, path: str = None ): """ Arguments: path: path where the CAMELS-CL dataset has been downloaded. This path must contain five zip files and one xlsx file. """ super().__init__(path=path) self.ds_dir = path if not os.path.exists(self.ds_dir): os.makedirs(self.ds_dir) for _file, url in self.urls.items(): fpath = os.path.join(self.ds_dir, _file) if not os.path.exists(fpath): download(url + _file, fpath) _unzip(self.ds_dir) self.dyn_fname = os.path.join(self.ds_dir, '') self._maybe_to_netcdf('camels_cl_dyn')
@property def _all_dirs(self): """All the folders in the dataset_directory""" return [f for f in os.listdir(self.ds_dir) if os.path.isdir(os.path.join(self.ds_dir, f))] @property def start(self): return "19130215" @property def end(self): return "20180309" @property def static_features(self) -> list: path = os.path.join(self.ds_dir, f"1_CAMELScl_attributes{SEP}1_CAMELScl_attributes.txt") df = pd.read_csv(path, sep='\t', index_col='gauge_id') return df.index.to_list()
[docs] def stations(self) -> list: """ Tells all station ids for which a data of a specific attribute is available. """ stn_fname = os.path.join(self.ds_dir, 'stations.json') if not os.path.exists(stn_fname): _stations = {} for dyn_attr in self.dynamic_features: for _dir in self._all_dirs: if dyn_attr in _dir: fname = os.path.join(self.ds_dir, f"{_dir}{SEP}{_dir}.txt") df = pd.read_csv(fname, sep='\t', nrows=2, index_col='gauge_id') _stations[dyn_attr] = list(df.columns) stns = list(set.intersection(*map(set, list(_stations.values())))) with open(stn_fname, 'w') as fp: json.dump(stns, fp) else: with open(stn_fname, 'r') as fp: stns = json.load(fp) return stns
def _read_dynamic_from_csv(self, stations, dynamic_features, st=None, en=None): dyn = {} st, en = self._check_length(st, en) assert all(stn in self.stations() for stn in stations) dynamic_features = check_attributes(dynamic_features, self.dynamic_features) # reading all dynnamic attributes dyn_attrs = {} for attr in dynamic_features: fname = [f for f in self._all_dirs if '_' + attr in f][0] fname = os.path.join(self.ds_dir, f'{fname}{SEP}{fname}.txt') _df = pd.read_csv(fname, sep='\t', index_col=['gauge_id'], na_values=" ") _df.index = pd.to_datetime(_df.index) dyn_attrs[attr] = _df[st:en] # making one separate dataframe for one station for stn in stations: stn_df = pd.DataFrame() for attr, attr_df in dyn_attrs.items(): if attr in dynamic_features: stn_df[attr] = attr_df[stn] dyn[stn] = stn_df[st:en] return dyn def _read_static(self, stations: list, attributes: list) -> pd.DataFrame: # overwritten for speed path = os.path.join(self.ds_dir, f"1_CAMELScl_attributes{SEP}1_CAMELScl_attributes.txt") _df = pd.read_csv(path, sep='\t', index_col='gauge_id') stns_df = [] for stn in stations: df = pd.DataFrame() if stn in _df: df[stn] = _df[stn] elif ' ' + stn in _df: df[stn] = _df[' ' + stn] stns_df.append(df.transpose()[attributes]) stns_df = pd.concat(stns_df) return stns_df
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]], features:Union[str, List[str]]=None ): """ Returns static features of one or more stations. Parameters ---------- stn_id : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Examples --------- >>> from ai4water.datasets import CAMELS_CL >>> dataset = CAMELS_CL() get the names of stations >>> stns = dataset.stations() >>> len(stns) 516 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (516, 104) get static data of one station only >>> static_data = dataset.fetch_static_features('11315001') >>> static_data.shape (1, 104) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['slope_mean', 'area']) >>> static_data.shape (516, 2) >>> data = dataset.fetch_static_features('2110002', features=['slope_mean', 'area']) >>> data.shape (1, 2) """ attributes = check_attributes(features, self.static_features) if isinstance(stn_id, str): stn_id = [stn_id] return self._read_static(stn_id, attributes)
[docs]class HYPE(Camels): """ Downloads and preprocesses HYPE [1]_ dataset from Lindstroem et al., 2010 [2]_ . This is a rainfall-runoff dataset of Sweden of 564 stations from 1985 to 2019 at daily, monthly and yearly time steps. Examples -------- >>> from ai4water.datasets import HYPE >>> dataset = HYPE() ... # get data of 5% of stations >>> df = dataset.fetch(stations=0.05, as_dataframe=True) # returns a multiindex dataframe >>> df.shape (115047, 28) ... # fetch data of 5 (randomly selected) stations >>> df = dataset.fetch(stations=5, as_dataframe=True) >>> df.shape (115047, 5) fetch data of 3 selected stations >>> df = dataset.fetch(stations=['564','563','562'], as_dataframe=True) >>> df.shape (115047, 3) ... # fetch data of a single stations >>> df = dataset.fetch(stations='500', as_dataframe=True) (115047, 1) # get only selected dynamic features >>> df = dataset.fetch(stations='501', ... dynamic_features=['AET_mm', 'Prec_mm', 'Streamflow_mm'], as_dataframe=True) # fetch data between selected periods >>> df = dataset.fetch(stations='225', st="20010101", en="20101231", as_dataframe=True) >>> df.shape (32868, 1) ... # get data at monthly time step >>> dataset = HYPE(time_step="month") >>> df = dataset.fetch(stations='500', as_dataframe=True) >>> df.shape (3780, 1) .. [1] .. [2] """ url = [ "", "" ] dynamic_features = [ 'AET_mm', 'Baseflow_mm', 'Infiltration_mm', 'SM_mm', 'Streamflow_mm', 'Runoff_mm', 'Qsim_m3-s', 'Prec_mm', 'PET_mm' ]
[docs] def __init__(self, time_step: str = 'daily', path = None, **kwargs): """ Parameters ---------- time_step : str one of ``daily``, ``month`` or ``year`` **kwargs key word arguments """ assert time_step in ['daily', 'month', 'year'] self.time_step = time_step self.ds_dir = path super().__init__(path=path, **kwargs) self._download() fpath = os.path.join(self.ds_dir, '') if not os.path.exists(fpath): self.time_step = 'daily' self._maybe_to_netcdf('hype_daily_dyn') self.time_step = 'month' self._maybe_to_netcdf('hype_month_dyn') self.time_step = 'year' self._maybe_to_netcdf('hype_year_dyn') self.time_step = time_step self.dyn_fname = os.path.join(self.ds_dir, f'hype_{time_step}')
[docs] def stations(self) -> list: _stations = np.arange(1, 565).astype(str) return list(_stations)
@property def static_features(self): return [] def _read_dynamic_from_csv(self, stations: list, attributes: Union[str, list] = 'all', st=None, en=None, ): dynamic_features = check_attributes(attributes, self.dynamic_features) _dynamic_attributes = [] for dyn_attr in dynamic_features: pref, suff = dyn_attr.split('_')[0], dyn_attr.split('_')[-1] _dyn_attr = f"{pref}_{self.time_step}_{suff}" _dynamic_attributes.append(_dyn_attr) df_attrs = {} for dyn_attr in _dynamic_attributes: fname = f"{dyn_attr}.csv" fpath = os.path.join(self.ds_dir, fname) index_col_name = 'DATE' if fname in ['SM_month_mm.csv', 'SM_year_mm.csv']: index_col_name = 'Date' _df = pd.read_csv(fpath, index_col=index_col_name) _df.index = pd.to_datetime(_df.index) # todo, some stations have wider range than df_attrs[dyn_attr] = _df.loc[self.start:self.end] stns_dfs = {} for st in stations: stn_dfs = [] cols = [] for dyn_attr, dyn_df in df_attrs.items(): stn_dfs.append(dyn_df[st]) col_name = f"{dyn_attr.split('_')[0]}_{dyn_attr.split('_')[-1]}" # get original name without time_step cols.append(col_name) stn_df = pd.concat(stn_dfs, axis=1) stn_df.columns = cols stns_dfs[st] = stn_df return stns_dfs
[docs] def fetch_static_features(self, stn_id, features=None): """static data for HYPE is not available.""" raise ValueError(f'No static feature for {}')
@property def start(self): return '19850101' @property def end(self): return '20191231'
[docs]class WaterBenchIowa(Camels): """ Rainfall run-off dataset for Iowa (US) following the work of `Demir et al., 2022 <>`_ Examples -------- >>> from ai4water.datasets import WaterBenchIowa >>> ds = WaterBenchIowa() ... # fetch static and dynamic features of 5 stations >>> data = ds.fetch(5, as_dataframe=True) >>> data.shape # it is a multi-indexed DataFrame (184032, 5) ... # fetch both static and dynamic features of 5 stations >>> data = ds.fetch(5, static_features="all", as_dataframe=True) >>> data.keys() dict_keys(['dynamic', 'static']) >>> data['static'].shape (5, 7) >>> data['dynamic'] # returns a xarray DataSet ... # using another method >>> data = ds.fetch_dynamic_features('644', as_dataframe=True) >>> data.unstack().shape (61344, 3) """ url = ""
[docs] def __init__(self, path=None): super(WaterBenchIowa, self).__init__(path=path) self._download() self._maybe_to_netcdf('')
[docs] def stations(self)->List[str]: return [fname.split('_')[0] for fname in os.listdir(self.ts_path) if fname.endswith('.csv')]
@property def ts_path(self)->str: return os.path.join(self.ds_dir, 'data_time_series', 'data_time_series') @property def dynamic_features(self) -> List[str]: return ['precipitation', 'et', 'discharge'] @property def static_features(self)->List[str]: return ['travel_time', 'area', 'slope', 'loam', 'silt', 'sandy_clay_loam', 'silty_clay_loam']
[docs] def fetch_station_attributes( self, station: str, dynamic_features: Union[str, list, None] = 'all', static_features: Union[str, list, None] = None, as_ts: bool = False, st: Union[str, None] = None, en: Union[str, None] = None, **kwargs ) -> pd.DataFrame: """ Examples -------- >>> from ai4water.datasets import WaterBenchIowa >>> dataset = WaterBenchIowa() >>> data = dataset.fetch_station_attributes('666') """ check_attributes(dynamic_features, self.dynamic_features) fname = os.path.join(self.ts_path, f"{station}_data.csv") df = pd.read_csv(fname) df.index = pd.to_datetime(df.pop('datetime')) return df
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]], features:Union[str, List[str]]=None )->pd.DataFrame: """ Parameters ---------- stn_id : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Examples --------- >>> from ai4water.datasets import WaterBenchIowa >>> dataset = WaterBenchIowa() get the names of stations >>> stns = dataset.stations() >>> len(stns) 125 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (125, 7) get static data of one station only >>> static_data = dataset.fetch_static_features('592') >>> static_data.shape (1, 7) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['slope', 'area']) >>> static_data.shape (125, 2) >>> data = dataset.fetch_static_features('592', features=['slope', 'area']) >>> data.shape (1, 2) """ if not isinstance(stn_id, list): stn_id = [stn_id] features = check_attributes(features, self.static_features) dfs = [] for stn in stn_id: fname = os.path.join(self.ts_path, f"{stn}_data.csv") df = pd.read_csv(fname, nrows=1) dfs.append(df[features]) return pd.concat(dfs)
def _read_dynamic_from_csv( self, stations, dynamic_features, st=None, en=None)->dict: dyn = dict() for stn in stations: fname = os.path.join(self.ts_path, f"{stn}_data.csv") df = pd.read_csv(fname) df.index = pd.to_datetime(df.pop('datetime')) dyn[stn] = df[self.dynamic_features] return dyn @property def start(self): return "20111001 12:00" @property def end(self): return "20180930 11:00"