Source code for ai4water.datasets.mtropics


import glob
import warnings
from subprocess import call
from typing import Union, Tuple
import datetime

try:
    from shapely.geometry import shape, mapping
    from shapely.ops import unary_union
except (ModuleNotFoundError, OSError):
    shape, mapping, unary_union = None, None, None

from ai4water.backend import os, shapefile, xr, np, pd, fiona

from .utils import check_attributes, check_st_en
from ._datasets import Datasets, maybe_download

from ai4water.preprocessing.resample import Resampler
from ai4water.preprocessing.spatial_utils import find_records


[docs]class MtropicsLaos(Datasets): """ Downloads and prepares hydrological, climate and land use data for Laos from Mtropics_ website and ird_ data servers. Methods ------- - fetch_lu - fetch_ecoli - fetch_rain_gauges - fetch_weather_station_data - fetch_pcp - fetch_hydro - make_regression .. _Mtropics: https://mtropics.obs-mip.fr/catalogue-m-tropics/ .. _ird: https://dataverse.ird.fr/dataset.xhtml?persistentId=doi:10.23708/EWOYNK """ target = ['Ecoli_mpn100'] url = { 'lu.zip': "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=0f1aea48-2a51-9b42-7688-a774a8f75e7a", 'pcp.zip': "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=3c870a03-324b-140d-7d98-d3585a63e6ec", 'hydro.zip': "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=389bbea0-7279-12c1-63d0-cfc4a77ded87", 'rain_guage.zip': "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=7bc45591-5b9f-a13d-90dc-f2a75b0a15cc", 'weather_station.zip': "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=353d7f00-8d6a-2a34-c0a2-5903c64e800b", 'ecoli_data.csv': "https://dataverse.ird.fr/api/access/datafile/5435", "ecoli_dict.csv": "https://dataverse.ird.fr/api/access/datafile/5436", "soilmap.zip": "https://dataverse.ird.fr/api/access/datafile/5430", "subs1.zip": "https://dataverse.ird.fr/api/access/datafile/5432", "suro.zip": "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=f06cb605-7e59-4ba4-8faf-1beee35d2162", "surf_feat.zip": "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=72d9e532-8910-48d2-b9a2-6c8b0241825b", "ecoli_source.csv": "https://dataverse.ird.fr/api/access/datafile/37737", "ecoli_source_readme.txt": "https://dataverse.ird.fr/api/access/datafile/37736", "ecoli_suro_gw.csv": "https://dataverse.ird.fr/api/access/datafile/37735", "ecoli_suro_gw_readme.txt": "https://dataverse.ird.fr/api/access/datafile/37734" } physio_chem_features = { "T_deg": "T", "EC_s/cm": "EC", "DO_percent": "DOpercent", "DO_mgl": "DO", "pH": "pH", "ORP_mV": "ORP", # stream water oxidation-reduction potential "Turbidity_NTU": "Turbidity", "TSS_gL": "TSS", } weather_station_data = ['air_temp', 'rel_hum', 'wind_speed', 'sol_rad'] inputs = weather_station_data + ['water_level', 'pcp', 'susp_pm', "Ecoli_source"]
[docs] def __init__( self, path=None, save_as_nc:bool = True, convert_to_csv:bool = False, **kwargs): if xr is None: raise ModuleNotFoundError( "xarray must be installed to use datasets sub-module") super().__init__(path=path, **kwargs) self.save_as_nc = save_as_nc self.ds_dir = path self.convert_to_csv = convert_to_csv self._download() # we need to pre-process the land use shapefiles in_dir = os.path.join(self.ds_dir, 'lu') out_dir = os.path.join(self.ds_dir, 'lu1') if not os.path.exists(out_dir): os.makedirs(out_dir) files = glob.glob(f'{in_dir}/*.shp') for fpath in files: f = os.path.basename(fpath) shp_file = os.path.join(in_dir, f) op = os.path.join(out_dir, f) _process_laos_shpfiles(shp_file, op)
[docs] def surface_features( self, st: Union[str, int, pd.Timestamp] = '2000-10-14', en: Union[str, int, pd.Timestamp] = '2016-11-12', )->pd.DataFrame: """soil surface features data""" fname = os.path.join( self.ds_dir, "surf_feat", "SEDOO_EdS_Houay Pano.xlsx") df = pd.read_excel(fname, sheet_name="Soil surface features") df.index = pd.to_datetime(df.pop('Date')) if st: if isinstance(en, int): assert isinstance(en, int) df = df.iloc[st:en] else: df = df.loc[st:en] return df
[docs] def fetch_suro( self, )->pd.DataFrame: """returns surface runoff and soil detachment data from Houay pano, Laos PDR. Returns ------- pd.DataFrame a dataframe of shape (293, 13) Examples -------- >>> from ai4water.datasets import MtropicsLaos >>> laos = MtropicsLaos() >>> suro = laos.fetch_suro() """ fname = os.path.join( self.ds_dir, 'suro', 'SEDOO_Runoff_Detachment_Houay Pano.xlsx') df = pd.read_excel(fname, sheet_name="Surface runoff soil detachment") return df.dropna()
[docs] def fetch_lu(self, processed=False): """returns landuse_ data as list of shapefiles. .. _landuse: https://doi.org/10.1038/s41598-017-04385-2""" lu_dir = os.path.join(self.ds_dir, f"{'lu1' if processed else 'lu'}") files = glob.glob(f'{lu_dir}/*.shp') return files
[docs] def fetch_physiochem( self, features: Union[list, str] = 'all', st: Union[str, pd.Timestamp] = '20110525 10:00:00', en: Union[str, pd.Timestamp] = '20210406 15:05:00', ) -> pd.DataFrame: """ Fetches physio-chemical features of Huoy Pano catchment Laos. Parameters ---------- st : start of data. en : end of data. features : The physio-chemical features to fetch. Following features are available - ``T`` - ``EC`` - ``DOpercent`` - ``DO`` - ``pH`` - ``ORP`` - ``Turbidity`` - ``TSS`` Returns ------- a pandas dataframe """ if isinstance(features, list): _features = [] for f in features: _features.append(self.physio_chem_features[f]) else: assert isinstance(features, str) if features == 'all': _features = features else: _features = self.physio_chem_features[features] features = check_attributes(_features, list(self.physio_chem_features.values())) fname = os.path.join(self.ds_dir, 'ecoli_data.csv') df = pd.read_csv(fname, sep='\t') df.index = pd.to_datetime(df['Date_Time']) df = df[features] col_names = {v: k for k, v in self.physio_chem_features.items() if v in features} df = df.rename(columns=col_names) return df.loc[st:en]
[docs] def fetch_ecoli( self, features: Union[list, str] = 'Ecoli_mpn100', st: Union[str, pd.Timestamp] = '20110525 10:00:00', en: Union[str, pd.Timestamp] = '20210406 15:05:00', remove_duplicates: bool = True, ) -> pd.DataFrame: """ Fetches E. coli data collected at the outlet. See Ribolzi_ et al., 2021 and Boithias_ et al., 2021 for reference. NaNs represent missing values. The data is randomly sampled between 2011 to 2021 during rainfall events. Total 368 E. coli observation points are available now. Parameters ---------- st : start of data. By default the data is fetched from the point it is available. en : end of data. By default the data is fetched til the point it is available. features : E. coli concentration data. Following data are available - Ecoli_LL_mpn100: Lower limit of the confidence interval - Ecoli_mpn100: Stream water Escherichia coli concentration - Ecoli_UL_mpn100: Upper limit of the confidence interval remove_duplicates : whether to remove duplicates or not. This is because some values were recorded within a minute, Returns ------- a pandas dataframe consisting of features as columns. .. _Ribolzi: https://dataverse.ird.fr/dataset.xhtml?persistentId=doi:10.23708/EWOYNK .. _Boithias: https://doi.org/10.1002/hyp.14126 """ fname = os.path.join(self.ds_dir, 'ecoli_data.csv') df = pd.read_csv(fname, sep='\t') df.index = pd.to_datetime(df['Date_Time']) available_features = { # Lower limit of the confidence interval "Ecoli_LL_mpn100": "E-coli_4dilutions_95%-CI-LL", # Stream water Escherichia coli concentration "Ecoli_mpn100": "E-coli_4dilutions", # Upper limit of the confidence interval "Ecoli_UL_mpn100": "E-coli_4dilutions_95%-CI-UL" } if isinstance(features, list): _features = [] for f in features: _features.append(available_features[f]) else: assert isinstance(features, str) if features == 'all': _features = features else: _features = available_features[features] features = check_attributes(_features, list(available_features.values())) if remove_duplicates: df = df[~df.index.duplicated(keep='first')] df = df.sort_index() df = df[features] col_names = {v: k for k, v in available_features.items() if v in features} df = df.rename(columns=col_names) return df.loc[st:en]
[docs] def fetch_rain_gauges( self, st: Union[str, pd.Timestamp] = "20010101", en: Union[str, pd.Timestamp] = "20191231", ) -> pd.DataFrame: """ fetches data from 7 rain gauges_ which is collected at daily time step from 2001 to 2019. Parameters ---------- st : start of data. By default the data is fetched from the point it is available. en : end of data. By default the data is fetched til the point it is available. Returns ------- a dataframe of 7 columns, where each column represnets a rain guage observations. The length of dataframe depends upon range defined by `st` and `en` arguments. Examples -------- >>> from ai4water.datasets import MtropicsLaos >>> laos = MtropicsLaos() >>> rg = laos.fetch_rain_gauges() .. _gauges: https://doi.org/10.1038/s41598-017-04385-2 """ # todo, does nan means 0 rainfall? fname = os.path.join(self.ds_dir, 'rain_guage', 'rain_guage.nc') if not os.path.exists(fname) or not self.save_as_nc: df = self._load_rain_gauge_from_xl_files() else: # feather file already exists so load from it try: df = xr.load_dataset(fname).to_dataframe() except AttributeError: df = self._load_rain_gauge_from_xl_files() df.index = pd.date_range('20010101', periods=len(df), freq='D') return df[st:en]
def _load_rain_gauge_from_xl_files(self): fname = os.path.join(self.ds_dir, 'rain_guage', 'rain_guage.nc') files = glob.glob(f"{os.path.join(self.ds_dir, 'rain_guage')}/*.xlsx") dfs = [] for f in files: df = pd.read_excel( f, sheet_name='Daily', usecols=['R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7'], keep_default_na=False) if os.path.basename(f) in ['OMPrawdataLaos2014.xlsx']: df = pd.read_excel( f, sheet_name='Daily', usecols=['R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7'], keep_default_na=False, nrows=366) df = df.dropna() dfs.append(df) df = pd.concat(dfs) for col in df.columns: df[col] = pd.to_numeric(df[col]) df = df.reset_index(drop=True) # index is of type Int64Index if self.save_as_nc: df.to_xarray().to_netcdf(fname) return df
[docs] def fetch_weather_station_data( self, st: Union[str, pd.Timestamp] = "20010101 01:00:00", en: Union[str, pd.Timestamp] = "20200101 00:00:00", freq: str = 'H' ) -> pd.DataFrame: """ fetches hourly weather [1]_ station data which consits of air temperature, humidity, wind speed and solar radiation. Parameters ---------- st : start of data to be feteched. en : end of data to be fetched. freq : frequency at which the data is to be fetched. Returns ------- a pandas dataframe consisting of 4 columns .. [1]: https://doi.org/10.1038/s41598-017-04385-2 """ nc_fname = os.path.join( self.ds_dir, 'weather_station', 'weather_stations.nc') if not os.path.exists(nc_fname) or not self.save_as_nc: df = self._load_weather_stn_from_xl_files() else: # feather file already exists so load from it try: df = xr.load_dataset(nc_fname).to_dataframe() except AttributeError: df = self._load_weather_stn_from_xl_files() df.index = pd.to_datetime(df.pop('datetime')) df.columns = self.weather_station_data df = df.asfreq('H') df = df.interpolate() df = df.bfill() return check_st_en(df, st, en)
def _load_weather_stn_from_xl_files(self): nc_fname = os.path.join( self.ds_dir, 'weather_station', 'weather_stations.nc') files = glob.glob( f"{os.path.join(self.ds_dir, 'weather_station')}/*.xlsx") vbsfile = os.path.join( self.ds_dir, "weather_station", 'ExcelToCsv.vbs') create_vbs_script(vbsfile) dataframes = [] for xlsx_file in files: if not xlsx_file.startswith("~"): if os.name == "nt": data_dir = os.path.join(self.ds_dir, "weather_station") df = to_csv_and_read( xlsx_file, data_dir, sheed_id='2', usecols=['Date', 'Time', 'T', 'H', 'W', 'Gr'], parse_dates={'datetime': ['Date', 'Time']}) else: df = pd.read_excel(xlsx_file, sheet_name='Hourly', usecols=['Date', 'T', 'H', 'W', 'Gr'], parse_dates={'datetime': ['Date']}, keep_default_na=False) df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce') df = df.dropna(how="all") df.index = pd.to_datetime(df.pop('datetime')) dataframes.append(df) df = pd.concat(dataframes) del dataframes # non-numertic dtype causes problem in converting/saving netcdf for col in df.columns: df[col] = pd.to_numeric(df[col]) df = df.reset_index() # index is of type Int64Index if self.save_as_nc: df.to_xarray().to_netcdf(nc_fname) return df
[docs] def fetch_pcp(self, st: Union[str, pd.Timestamp] = '20010101 00:06:00', en: Union[str, pd.Timestamp] = '20200101 00:06:00', freq: str = '6min' ) -> pd.DataFrame: """ Fetches the precipitation_ data which is collected at 6 minutes time-step from 2001 to 2020. Parameters ---------- st : starting point of data to be fetched. en : end point of data to be fetched. freq : frequency at which the data is to be returned. Returns ------- pandas dataframe of precipitation data .. _precipitation: https://doi.org/10.1038/s41598-017-04385-2 """ # todo allow change in frequency fname = os.path.join(self.ds_dir, 'pcp', 'pcp.nc') # feather file does not exist if not os.path.exists(fname) or not self.save_as_nc: df = self._load_pcp_from_excel_files() else: # nc file already exists so load from it try: df = xr.load_dataset(fname).to_dataframe() # on linux, it is giving error except AttributeError: # 'EntryPoints' object has no attribute 'get' df = self._load_pcp_from_excel_files() df.index = pd.date_range('20010101 00:06:00', periods=len(df), freq='6min') df.columns = ['pcp'] return df[st:en]
def _load_pcp_from_excel_files(self): fname = os.path.join(self.ds_dir, 'pcp', 'pcp.nc') files = glob.glob(f"{os.path.join(self.ds_dir, 'pcp')}/*.xlsx") df = pd.DataFrame() for f in files: _df = pd.read_excel(f, sheet_name='6mn', usecols=['Rfa']) df = pd.concat([df, _df]) df = df.reset_index(drop=True) if self.save_as_nc: df.to_xarray().to_netcdf(fname) return df
[docs] def fetch_hydro( self, st: Union[str, pd.Timestamp] = '20010101 00:06:00', en: Union[str, pd.Timestamp] = '20200101 00:06:00', ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ fetches water level (cm) and suspended particulate matter (g L-1). Both data are from 2001 to 2019 but are randomly sampled. Parameters ---------- st : optional starting point of data to be fetched. en : optional end point of data to be fetched. Returns ------- a tuple of pandas dataframes of water level and suspended particulate matter. """ wl_fname = os.path.join(self.ds_dir, 'hydro', 'wl.nc') spm_fname = os.path.join(self.ds_dir, 'hydro', 'spm.nc') if not os.path.exists(wl_fname) or not self.save_as_nc: wl, spm = self._load_hydro_from_xl_files() else: try: wl = xr.load_dataset(wl_fname).to_dataframe() spm = xr.load_dataset(spm_fname).to_dataframe() except AttributeError: wl, spm = self._load_hydro_from_xl_files() wl = wl[~wl.index.duplicated(keep='first')] spm = spm[~spm.index.duplicated(keep='first')] # FutureWarning: Value based partial slicing on non-monotonic # DatetimeIndexes return wl.loc[st:en], spm.loc[st:en]
def _load_hydro_from_xl_files(self): """ Most of the files are saved as a peace of shit in excel. I wish I had never consdered reading those files """ wl_fname = os.path.join(self.ds_dir, 'hydro', 'wl.nc') spm_fname = os.path.join(self.ds_dir, 'hydro', 'spm.nc') print("reading data from xlsx files and saving them in netcdf format.") print("This will happen only once but will save io time.") files = glob.glob(f"{os.path.join(self.ds_dir, 'hydro')}/*.xlsx") wls = [] spms = [] for f in files: _df = pd.read_excel(f, sheet_name='Aperiodic') _wl = _df[['Date', 'Time', 'RWL04']] correct_time(_wl, 'Time') if os.path.basename(f) in ["OMPrawdataLaos2005.xlsx", "OMPrawdataLaos2001.xlsx", "OMPrawdataLaos2006.xlsx", "OMPrawdataLaos2012.xlsx", "OMPrawdataLaos2013.xlsx", "OMPrawdataLaos2014.xlsx"]: _wl = _wl.iloc[0:-1] if os.path.basename(f) in ["OMPrawdataLaos2011.xlsx"]: _wl = _wl.iloc[0:-1] if os.path.basename(f) in ["OMPrawdataLaos2008.xlsx"]: _wl = _wl.dropna() if os.path.basename(f) in ["OMPrawdataLaos2009.xlsx", "OMPrawdataLaos2010.xlsx", "OMPrawdataLaos2011.xlsx", "OMPrawdataLaos2015.xlsx", "OMPrawdataLaos2016.xlsx", "OMPrawdataLaos2017.xlsx", "OMPrawdataLaos2018.xlsx", "OMPrawdataLaos2019.xlsx", ]: _wl = _wl.dropna() index = _wl['Date'].astype(str) + ' ' + _wl['Time'].astype(str) _wl.index = pd.to_datetime(index) _spm = _df[['Date.1', 'Time.1', 'SPM04']] correct_time(_spm, 'Time.1') _spm = _spm.dropna() _spm = _spm.iloc[_spm.first_valid_index():_spm.last_valid_index()] if os.path.basename(f) == 'OMPrawdataLaos2016.xlsx': _spm.iloc[166] = ['2016-07-01', '20:43:47', 1.69388] _spm.iloc[247] = ['2016-07-23', '12:57:47', 8.15714] _spm.iloc[248] = ['2016-07-23', '17:56:47', 0.5] _spm.iloc[352] = ['2016-08-16', '03:08:17', 1.12711864406] if os.path.basename(f) == 'OMPrawdataLaos2017.xlsx': _spm.index = pd.to_datetime(_spm['Date.1'].astype(str)) else: index = _spm['Date.1'].astype(str) + ' ' + _spm['Time.1'].astype(str) _spm.index = pd.to_datetime(index) wls.append(_wl['RWL04']) spms.append(_spm['SPM04']) wl = pd.DataFrame(pd.concat(wls)) spm = pd.DataFrame(pd.concat(spms)) wl.columns = ['water_level'] spm.columns = ['susp_pm'] if self.save_as_nc: try: wl.to_xarray().to_netcdf(wl_fname) except (ValueError, AttributeError): if os.path.exists(wl_fname): os.remove(wl_fname) try: spm.to_xarray().to_netcdf(spm_fname) except (ValueError, AttributeError): if os.path.exists(spm_fname): os.remove(spm_fname) return wl, spm
[docs] def make_classification( self, input_features: Union[None, list] = None, output_features: Union[str, list] = None, st: Union[None, str] = "20110525 14:00:00", en: Union[None, str] = "20181027 00:00:00", freq: str = "6min", threshold: Union[int, dict] = 400, lookback_steps: int = None, ) -> pd.DataFrame: """ Returns data for a classification problem. Parameters ---------- input_features : names of inputs to use. output_features : feature/features to consdier as target/output/label st : starting date of data. The default starting date is 20110525 en : end date of data freq : frequency of data threshold : threshold to use to determine classes. Values greater than equal to threshold are set to 1 while values smaller than threshold are set to 0. The value of 400 is chosen for E. coli to make the the number 0s and 1s balanced. It should be noted that US-EPA recommends threshold value of 400 cfu/ml. lookback_steps: the number of previous steps to use. If this argument is used, the resultant dataframe will have (ecoli_observations * lookback_steps) rows. The resulting index will not be continuous. Returns ------- pd.DataFrame a dataframe of shape `(inputs+target, st:en)` Example ------- >>> from ai4water.datasets import MtropicsLaos >>> laos = MtropicsLaos() >>> df = laos.make_classification() """ thresholds = { 'Ecoli_mpn100': 400 } target: list = check_attributes(output_features, self.target) data = self._make_ml_problem(input_features, target, st, en, freq) if len(target) == 1: threshold = threshold or thresholds[target[0]] else: raise ValueError s = data[target[0]] s[s < threshold] = 0 s[s >= threshold] = 1 data[target[0]] = s if lookback_steps: return consider_lookback(data, lookback_steps, target) return data
[docs] def make_regression( self, input_features: Union[None, list] = None, output_features: Union[str, list] = "Ecoli_mpn100", st: Union[None, str] = "20110525 14:00:00", en: Union[None, str] = "20181027 00:00:00", freq: str = "6min", lookback_steps: int = None, replace_zeros_in_target:bool=True, ) -> pd.DataFrame: """ Returns data for a regression problem using hydrological, environmental, and water quality data of Huoay pano. Parameters ---------- input_features : names of inputs to use. By default following features are used as input - ``air_temp`` - ``rel_hum`` - ``wind_speed`` - ``sol_rad`` - ``water_level`` - ``pcp`` - ``susp_pm`` - ``Ecoli_source`` output_features : feature/features to consdier as target/output/label st : starting date of data en : end date of data freq : frequency of data lookback_steps : int, default=None the number of previous steps to use. If this argument is used, the resultant dataframe will have (ecoli_observations * lookback_steps) rows. The resulting index will not be continuous. replace_zeros_in_target : bool, default=True Replace the zeroes in target column with 1s. Returns ------- pd.DataFrame a dataframe of shape (inputs+target, st - en) Example ------- >>> from ai4water.datasets import MtropicsLaos >>> laos = MtropicsLaos() >>> ins = ['pcp', 'air_temp'] >>> out = ['Ecoli_mpn100'] >>> reg_data = laos.make_regression(ins, out, '20110101', '20181231') todo add HRU definition """ data = self._make_ml_problem( input_features, output_features, st, en, freq, replace_zeros_in_target=replace_zeros_in_target) if lookback_steps: return consider_lookback(data, lookback_steps, output_features) return data
def _make_ml_problem( self, input_features, output_features, st, en, freq, replace_zeros_in_target:bool = True ): inputs = check_attributes(input_features, self.inputs) target = check_attributes(output_features, self.target) features_to_fetch = inputs + target pcp = self.fetch_pcp(st=st, en=en) pcp = pcp.interpolate('linear', limit=5) pcp = pcp.fillna(0.0) w = self.fetch_weather_station_data(st=st, en=en) assert int(w.isna().sum().sum()) == 0, f"{int(w.isna().sum().sum())}" w.columns = ['air_temp', 'rel_hum', 'wind_speed', 'sol_rad'] w_6min = Resampler(w, freq=freq, how={'air_temp': 'linear', 'rel_hum': 'linear', 'wind_speed': 'linear', 'sol_rad': 'linear' } )() ecoli = self.fetch_ecoli(st=st, en=en) ecoli = ecoli.dropna() ecoli_6min = ecoli.resample(freq).mean() if replace_zeros_in_target: ecoli_6min.loc[ecoli_6min['Ecoli_mpn100']==0.0] = 1.0 wl, spm = self.fetch_hydro(st=st, en=en) wl_6min = wl.resample(freq).first().interpolate(method="linear") spm_6min = spm.resample(freq).first().interpolate(method='linear') # backfilling because for each month the value is given for last day of month src = self.fetch_source().loc[:, 'NB_E. coli_total'].asfreq("6min").bfill() src.name = "Ecoli_source" data = pd.concat([w_6min.loc[st:en], pcp.loc[st:en], wl_6min.loc[st:en], spm_6min.loc[st:en], src[st:en], ecoli_6min.loc[st:en], ], axis=1) if data['water_level'].isna().sum() < 15: data['water_level'] = data['water_level'].bfill() # only 11 nan present at start data['water_level'] = data['water_level'].ffill() # only 1 nan is present at ned if data['susp_pm'].isna().sum() < 40: data['susp_pm'] = data['susp_pm'].bfill() # only 26 nan is present at ned data['susp_pm'] = data['susp_pm'].ffill() # only 9 nan is present return data.loc[st:en, features_to_fetch]
[docs] def fetch_source( self )->pd.DataFrame: """ returns monthly source data for E. coli at from 2001 to 2021 obtained from `here <https://dataverse.ird.fr/dataset.xhtml?persistentId=doi:10.23708/7XJ3TB>`_ Returns -------- pd.DataFrame of shape (252, 19) """ fname = os.path.join(self.ds_dir, "ecoli_source.csv") df = pd.read_csv(fname, sep="\t") df.index = pd.date_range("20010101", "20211231", freq="M") df.pop('Time') df.index.freq =pd.infer_freq(df.index) return df
class MtropcsThailand(Datasets): url = { "pcp.zip": "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=27c65b5f-59cb-87c1-4fdf-628e6143d8c4", # "hydro.zip": #"https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=9e6f7144-8984-23bd-741a-06378fabd72", "rain_gauge.zip": "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=0a12ffcf-42bc-0289-1c55-a769ef19bb16", "weather_station.zip": "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=fa0bca5f-caee-5c68-fed7-544fe121dcf5 " } def __init__(self, **kwargs): super().__init__(**kwargs) self._download() class MtropicsVietnam(Datasets): url = { "pcp.zip": "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=d74ab1b0-379b-71cc-443b-662a73b7f596", "hydro.zip": "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=85fb6717-4095-a2a2-34b5-4f1b70cfd304", # "lu.zip": #"https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=c3724992-a043-4bbf-8ac1-bc6f9a608c1c", "rain_guage.zip": "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=3d3382d5-08c1-2595-190b-8568a1d2d6af", "weather_station.zip": "https://services.sedoo.fr/mtropics/data/v1_0/download?collectionId=8df40086-4232-d8d0-a1ed-56c860818989" } def __init__(self, **kwargs): super().__init__(**kwargs) self._download() def _process_laos_shpfiles(shape_file, out_path): if fiona is None: warnings.warn("preprocessing of shapefiles can not be done because no fiona installation is found.") return shp_reader = shapefile.Reader(shape_file) container = { 'Forest': [], 'Culture': [], 'Fallow': [], 'Teak': [], # 'others': [] } for i in range(shp_reader.numRecords): lu = find_records(shape_file, 'LU3', i) shp = shp_reader.shape(i) if shp.shapeType == 0: continue geom = shape(shp.__geo_interface__) if lu.startswith('Forest'): container['Forest'].append(geom) elif lu.startswith('Culture'): container['Culture'].append(geom) elif lu.startswith('Fallow'): container['Fallow'].append(geom) elif lu.startswith('Teak'): container['Teak'].append(geom) else: # just consider all others as 'culture' for siplicity container['Culture'].append(geom) # container['others'].append(geom) # Define a polygon feature geometry with one attribute schema = { 'geometry': 'Polygon' if os.path.basename(shape_file) in [ 'LU2000.shp', 'LU2001.shp'] else 'MultiPolygon', 'properties': {'id': 'int', 'NAME': 'str', 'area': 'float'}, } # Write a new Shapefile with fiona.open(out_path, 'w', 'ESRI Shapefile', schema) as c: for idx, lu in enumerate(list(container.keys())): geoms = container[lu] poly = unary_union([shape(s.__geo_interface__) for s in geoms]) assert poly.is_valid c.write({ 'geometry': mapping(poly), 'properties': {'id': idx, 'NAME': lu, 'area': poly.area}, }) def consider_lookback(df:pd.DataFrame, lookback:int, col_name:str)->pd.DataFrame: """selects rows from dataframe considering lookback based upon nan values in col_name""" if isinstance(col_name, list): assert len(col_name) == 1 col_name = col_name[0] if not isinstance(col_name, str): raise NotImplementedError start = False steps = 0 masks = np.full(len(df), False) for idx, ecoli in enumerate(df[col_name].values[::-1]): if not ecoli != ecoli: start = True steps = 0 if start and steps < lookback: masks[idx] = True steps += 1 # if we have started counting but the limit has reached if start and steps > lookback: start = False return df.iloc[masks[::-1]] def ecoli_mekong( st: Union[str, pd.Timestamp, int] = "20110101", en: Union[str, pd.Timestamp, int] = "20211231", features:Union[str, list] = None, overwrite=False )->pd.DataFrame: """ E. coli data from Mekong river (Houay Pano) area from 2011 to 2021 Boithias et al., 2022 [1]_. Parameters ---------- st : optional starting time. The default starting point is 2011-05-25 10:00:00 en : optional end time, The default end point is 2021-05-25 15:41:00 features : str, optional names of features to use. use ``all`` to get all features. By default following input features are selected - ``station_name`` name of station/catchment where the observation was made - ``T`` temperature - ``EC`` electrical conductance - ``DOpercent`` dissolved oxygen concentration - ``DO`` dissolved oxygen saturation - ``pH`` pH - ``ORP`` oxidation-reduction potential - ``Turbidity`` turbidity - ``TSS`` total suspended sediment concentration - ``E-coli_4dilutions`` Eschrechia coli concentration overwrite : bool whether to overwrite the downloaded file or not Returns ------- pd.DataFrame with default parameters, the shape is (1602, 10) Examples -------- >>> from ai4water.datasets import ecoli_mekong >>> ecoli_data = ecoli_mekong() >>> ecoli_data.shape (1602, 10) .. [1] https://essd.copernicus.org/preprints/essd-2021-440/ """ ecoli = ecoli_houay_pano(st, en, features, overwrite=overwrite) ecoli1 = ecoli_mekong_2016(st, en, features, overwrite=overwrite) ecoli2 = ecoli_mekong_laos(st, en, features, overwrite=overwrite) return pd.concat([ecoli, ecoli1, ecoli2])
[docs]def ecoli_mekong_2016( st: Union[str, pd.Timestamp, int] = "20160101", en: Union[str, pd.Timestamp, int] = "20161231", features:Union[str, list] = None, overwrite=False )->pd.DataFrame: """ E. coli data from Mekong river from 2016 from 29 catchments Parameters ---------- st : starting time en : end time features : str, optional names of features to use. use ``all`` to get all features. overwrite : bool whether to overwrite the downloaded file or not Returns ------- pd.DataFrame with default parameters, the shape is (58, 10) Examples -------- >>> from ai4water.datasets import ecoli_mekong_2016 >>> ecoli = ecoli_mekong_2016() >>> ecoli.shape (58, 10) .. url_ https://dataverse.ird.fr/dataset.xhtml?persistentId=doi:10.23708/ZRSBM4 """ url = {"ecoli_mekong_2016.csv": "https://dataverse.ird.fr/api/access/datafile/8852"} ds_dir = os.path.join(os.path.dirname(__file__), 'data', 'ecoli_mekong_2016') return _fetch_ecoli(ds_dir, overwrite, url, None, features, st, en, "ecoli_houay_pano_tab_file")
def ecoli_houay_pano( st: Union[str, pd.Timestamp, int] = "20110101", en: Union[str, pd.Timestamp, int] = "20211231", features:Union[str, list] = None, overwrite=False )->pd.DataFrame: """ E. coli data from Mekong river (Houay Pano) area. Parameters ---------- st : optional starting time. The default starting point is 2011-05-25 10:00:00 en : optional end time, The default end point is 2021-05-25 15:41:00 features : str, optional names of features to use. use ``all`` to get all features. By default following input features are selected ``station_name`` name of station/catchment where the observation was made ``T`` temperature ``EC`` electrical conductance ``DOpercent`` dissolved oxygen concentration ``DO`` dissolved oxygen saturation ``pH`` pH ``ORP`` oxidation-reduction potential ``Turbidity`` turbidity ``TSS`` total suspended sediment concentration ``E-coli_4dilutions`` Eschrechia coli concentration overwrite : bool whether to overwrite the downloaded file or not Returns ------- pd.DataFrame with default parameters, the shape is (413, 10) Examples -------- >>> from ai4water.datasets import ecoli_houay_pano >>> ecoli = ecoli_houay_pano() >>> ecoli.shape (413, 10) .. url_ https://dataverse.ird.fr/dataset.xhtml?persistentId=doi:10.23708/EWOYNK """ url = {"ecoli_houay_pano_file.csv": "https://dataverse.ird.fr/api/access/datafile/9230"} ds_dir = os.path.join(os.path.dirname(__file__), 'data', 'ecoli_houay_pano') return _fetch_ecoli(ds_dir, overwrite, url, None, features, st, en, "ecoli_houay_pano_tab_file") def ecoli_mekong_laos( st: Union[str, pd.Timestamp, int] = "20110101", en: Union[str, pd.Timestamp, int] = "20211231", features:Union[str, list] = None, station_name:str = None, overwrite=False )->pd.DataFrame: """ E. coli data from Mekong river (Northern Laos). Parameters ---------- st : starting time en : end time station_name : str features : str, optional overwrite : bool whether to overwrite or not Returns ------- pd.DataFrame with default parameters, the shape is (1131, 10) Examples -------- >>> from ai4water.datasets import ecoli_mekong_laos >>> ecoli = ecoli_mekong_laos() >>> ecoli.shape (1131, 10) .. url_ https://dataverse.ird.fr/file.xhtml?fileId=9229&version=3.0 """ url = {"ecoli_mekong_loas_file.csv": "https://dataverse.ird.fr/api/access/datafile/9229"} ds_dir = os.path.join(os.path.dirname(__file__), 'data', 'ecoli_mekong_loas') return _fetch_ecoli(ds_dir, overwrite, url, station_name, features, st, en, "ecoli_mekong_laos_tab_file") def _fetch_ecoli(ds_dir, overwrite, url, station_name, features, st, en, _name): maybe_download(ds_dir, overwrite=overwrite, url=url, name=_name) all_files = os.listdir(ds_dir) assert len(all_files)==1 fname = os.path.join(ds_dir, all_files[0]) df = pd.read_csv(fname, sep='\t') df.index = pd.to_datetime(df['Date_Time']) if station_name is not None: assert station_name in df['River'].unique().tolist() df = df.loc[df['River']==station_name] if features is None: features = ['River', 'T', 'EC', 'DOpercent', 'DO', 'pH', 'ORP', 'Turbidity', 'TSS', 'E-coli_4dilutions'] features = check_attributes(features, df.columns.tolist()) df = df[features] # River is not a representative name df = df.rename(columns={"River": "station_name"}) if st: if isinstance(en, int): assert isinstance(en, int) df = df.iloc[st:en] else: df = df.loc[st:en] return df def to_csv_and_read( xlsx_file:str, data_dir:str, sheed_id:str, **read_csv_kwargs )->pd.DataFrame: """converts the xlsx file to csv and reads it to dataframe.""" vbsfile = os.path.join(data_dir, 'ExcelToCsv.vbs') create_vbs_script(vbsfile) assert xlsx_file.endswith(".xlsx") fname = os.path.basename(xlsx_file).split('.')[0] #if not fname.startswith("~"): csv_fpath = os.path.join(data_dir, f"{fname}.csv") if not os.path.exists(csv_fpath): call(['cscript.exe', vbsfile, xlsx_file, csv_fpath, sheed_id]) return pd.read_csv(csv_fpath, **read_csv_kwargs) def create_vbs_script(vbsfile): f = open(vbsfile, 'wb') f.write(vbscript.encode('utf-8')) f.close() return vbscript="""if WScript.Arguments.Count < 3 Then WScript.Echo "Please specify the source and the destination files. Usage: ExcelToCsv <xls/xlsx source file> <csv destination file> <worksheet number (starts at 1)>" Wscript.Quit End If csv_format = 6 Set objFSO = CreateObject("Scripting.FileSystemObject") src_file = objFSO.GetAbsolutePathName(Wscript.Arguments.Item(0)) dest_file = objFSO.GetAbsolutePathName(WScript.Arguments.Item(1)) worksheet_number = CInt(WScript.Arguments.Item(2)) Dim oExcel Set oExcel = CreateObject("Excel.Application") Dim oBook Set oBook = oExcel.Workbooks.Open(src_file) oBook.Worksheets(worksheet_number).Activate oBook.SaveAs dest_file, csv_format oBook.Close False oExcel.Quit """ def correct_time(df, col_name): time = df[col_name].astype(str) ctime = [] for i in time: if '1899' in i: ctime.append(i[11:]) else: ctime.append(i) df[col_name] = ctime return df