from typing import Union
from collections import OrderedDict
from .spatial_utils import find_records
from .spatial_utils import plot_shapefile
from .spatial_utils import get_total_area, GifUtil
from .spatial_utils import get_sorted_dict, get_areas_geoms, check_shp_validity
from ai4water.backend import os, np, pd, plt, mpl, shapefile, easy_mpl
mdates = mpl.dates
M2ToAcre = 0.0002471 # meter square to Acre
COLORS = ['#CDC0B0', '#00FFFF', '#76EEC6', '#C1CDCD', '#E3CF57', '#EED5B7',
'#8B7D6B', '#0000FF', '#8A2BE2', '#9C661F', '#FF4040', '#8A360F',
'#98F5FF', '#FF9912', '#B23AEE', '#9BCD9B', '#8B8B00']
[docs]class MakeHRUs(object):
"""
Distributes a given time series data for HRUs in a catchment according to
the `hru_definition`. Currently it is supposed that only land use changes
with time.
Example:
>>> import os
>>> from ai4water.preprocessing.spatial_processing import MakeHRUs
>>> # shapefile_paths is the path where shapefiles are located. todo
>>> SubBasin_shp = os.path.join(shapefile_paths, 'sub_basins.shp')
>>> shapefile_paths = os.path.join(os.getcwd(), 'shapefiles')
>>> hru_object = MakeHRUs('unique_lu_sub',
... index={2011: {'shapefile': os.path.join(shapefile_paths, 'lu2011.shp'), 'feature': 'NAME'},
... 2012: {'shapefile': os.path.join(shapefile_paths, 'lu2012.shp'), 'feature': 'NAME'}},
... subbasins_shape={'shapefile': SubBasin_shp, 'feature': 'id'}
... )
>>> hru_object.call()
"""
HRU_DEFINITIONS = [
'unique_sub',
'unique_lu',
'unique_soil',
'unique_slope',
'unique_lu_sub',
'unique_lu_soil',
'unique_lu_slope',
'unique_soil_sub',
'unique_soil_slope',
'unique_slope_sub',
'unique_lu_soil_sub',
'unique_lu_soil_slope',
]
[docs] def __init__(
self,
hru_definition:str,
index:dict,
soil_shape: Union[dict, None] = None,
slope_shape: Union[dict, None]=None,
subbasins_shape: Union[None, dict] = None,
save:bool = False,
show:bool = True,
verbosity: int = 1
):
"""
Parameters
----------
hru_definition :
hru definition. For valid hru_definitions check `MakeHRUs.HRU_DEFINITIONS`
index :
dictionary defining shapefiles of landuse which change with time.
For example in following a land use shapefile for two years is defined.
All attributes in land use shapefiles must have the feature `NAME`.
>>> {2011: {'shapefile': os.path.join(shapefile_paths, 'lu2011.shp'), 'feature': 'NAME'},
... 2012: {'shapefile': os.path.join(shapefile_paths, 'lu2012.shp'), 'feature': 'NAME'}}
soil_shape :
only applicable if `soil` exists in hru_definition.
All attributes in land use soil.shp must have the feature `NAME`.
>>> {'shapefile': os.path.join(shapefile_paths, 'soil.shp'), 'feature': 'NAME'}
slope_shape :
only applicable if `slope` exists in hru_definition.
All attributes in slope.shp shapefiles must have the feature `percent`.
>>> {'shapefile': os.path.join(shapefile_paths, 'slope.shp'), 'feature': 'percent'}
subbasins_shape :
only applicable if `sub` exists in hru_definition.
All attributes in land use shapefiles must have the feature `id`.
>>> {'shapefile': os.path.join(shapefile_paths, 'subbasins.shp'), 'feature': 'id'}
save : bool
show : bool, default=False
whether to save the plots or not.
verbosity :
Determines verbosity.
"""
if shapefile is None:
raise ModuleNotFoundError(f"You must install pyshp package e.g. pip install pyshp")
self.hru_definition = hru_definition
assert hru_definition in self.HRU_DEFINITIONS, f"""
invalid value for hru_definition '{hru_definition}' provided.
Allowed values are
{self.HRU_DEFINITIONS}"""
self.combinations = hru_definition.split('_')[1:]
if len(self.combinations)<2:
if isinstance(index, dict):
if not all([i.__class__.__name__=='NoneType' for i in index.values()]):
assert all([i.__class__.__name__=='NoneType' for i in [soil_shape, slope_shape, subbasins_shape]]), f"""
hru consists of only one feature i.e. {self.combinations[0]}. Thus if index is provided then not
other shapefile must be given.
"""
self.index = index
self.soil_shape = soil_shape
self.slope_shape = slope_shape
self.sub_shape = subbasins_shape
self.hru_paras = OrderedDict()
self.hru_geoms = OrderedDict()
self.all_hrus = []
self.hru_names = []
self.save = save
self.show = show
self.verbosity = verbosity
st, en = list(index.keys())[0], list(index.keys())[-1]
# initiating yearly dataframes
self.area = pd.DataFrame(index=pd.date_range(str(st) + '0101', str(en) + '1231', freq='12m'))
self.curve_no = pd.DataFrame(index=pd.date_range(str(st) + '0101', str(en) + '1231', freq='12m'))
# distance_to_outlet
self.dist_to_out = pd.DataFrame(index=pd.date_range(str(st) + '0101', str(en) + '1231',
freq='12m'))
# area of HRU as fraction of total catchment
self.area_frac_cat = pd.DataFrame(index=pd.date_range(str(st) + '0101', str(en) + '1231',
freq='12m'))
[docs] def call(self, plot_hrus=True):
"""
Makes the HRUs.
Parameters
----------
plot_hrus :
If true, the exact area hrus will be plotted as well.
"""
for _yr, shp_file in self.index.items():
_hru_paras, _hru_geoms = self.get_hrus(shp_file, _yr)
self.hru_paras.update(_hru_paras)
if plot_hrus:
self.plot_hrus(year=_yr, _polygon_dict=self.hru_geoms, nrows=3, ncols=4,
bbox=self.slope_shape, annotate=False,
name=self.hru_definition)
return
[docs] def get_hrus(self,
idx_shp,
year
):
"""
idx_shp :
shapefile whose area distribution changes with time e.g land use
"""
if self.verbosity > 0:
print('Checking validity of landuse shapefile')
hru_paras = OrderedDict()
a = 0
if len(self.combinations) ==1:
shp_name = self.combinations[0]
if idx_shp is None:
shp_file = getattr(self, f'{shp_name}_shape')['shapefile']
feature = getattr(self, f'{shp_name}_shape')['feature']
else:
shp_file = idx_shp['shapefile']
feature = idx_shp['feature']
shp_reader = shapefile.Reader(shp_file)
_, shp_geom_list = get_areas_geoms(shp_reader)
if 'sub' not in self.hru_definition:
shp_geom_list = check_shp_validity(shp_geom_list, len(shp_reader.shapes()), name=shp_name,
verbosity=self.verbosity)
self.tot_cat_area = get_total_area(shp_geom_list)
for shp in range(len(shp_reader.shapes())):
code = f'{str(year)}_{shp_name}_{find_records(shp_file, feature, shp)}'
hru_paras[code] = {'yearless_key': code[4:]}
intersection = shp_geom_list[shp]
self.hru_geoms[code] = [intersection, shp_geom_list[shp]]
self._foo(code, intersection)
if len(self.combinations) == 2:
first_shp_name = self.combinations[0]
second_shp_name = self.combinations[1]
if idx_shp is None:
first_shp_file = getattr(self, f'{first_shp_name}_shape')['shapefile']
first_feature = getattr(self, f'{first_shp_name}_shape')['feature']
else:
first_shp_file = idx_shp['shapefile']
first_feature = idx_shp['feature']
second_shp_file = getattr(self, f'{second_shp_name}_shape')['shapefile']
second_feature = getattr(self, f'{second_shp_name}_shape')['feature']
first_shp_reader = shapefile.Reader(first_shp_file)
second_shp_reader = shapefile.Reader(second_shp_file)
_, first_shp_geom_list = get_areas_geoms(first_shp_reader)
_, second_shp_geom_list = get_areas_geoms(second_shp_reader)
if 'sub' not in self.hru_definition:
second_shp_geom_list = check_shp_validity(second_shp_geom_list, len(second_shp_reader.shapes()),
name=second_shp_name, verbosity=self.verbosity)
self.tot_cat_area = get_total_area(first_shp_geom_list)
else:
self.tot_cat_area = get_total_area(second_shp_geom_list)
for j in range(len(second_shp_reader.shapes())):
for lu in range(len(first_shp_reader.shapes())):
a += 1
intersection = second_shp_geom_list[j].intersection(first_shp_geom_list[lu])
lu_code = find_records(first_shp_file, first_feature, lu)
sub_code = find_records(second_shp_file, second_feature, j)
sub_code = f'_{second_shp_name}_' + str(sub_code)
code = str(year) + sub_code + f'_{first_shp_name}_' + lu_code #, lu_code
self.hru_geoms[code] = [intersection, second_shp_geom_list[j], first_shp_geom_list[lu]]
hru_paras[code] = {'yearless_key': code[4:]}
self._foo(code, intersection)
if len(self.combinations) == 3:
first_shp_name = self.combinations[0]
second_shp_name = self.combinations[1]
third_shp_name = self.combinations[2]
if idx_shp is None:
first_shp_file = getattr(self, f'{first_shp_name}_shape')['shapefile']
first_feature = getattr(self, f'{first_shp_name}_shape')['feature']
else:
first_shp_file = idx_shp['shapefile']
first_feature = idx_shp['feature']
second_shp_file = getattr(self, f'{second_shp_name}_shape')['shapefile']
second_feature = getattr(self, f'{second_shp_name}_shape')['feature']
third_shp_file = getattr(self, f'{third_shp_name}_shape')['shapefile']
third_feature = getattr(self, f'{third_shp_name}_shape')['feature']
first_shp_reader = shapefile.Reader(first_shp_file)
second_shp_reader = shapefile.Reader(second_shp_file)
third_shp_reader = shapefile.Reader(third_shp_file)
_, first_shp_geom_list = get_areas_geoms(first_shp_reader)
_, second_shp_geom_list = get_areas_geoms(second_shp_reader)
_, third_shp_geom_list = get_areas_geoms(third_shp_reader)
if 'sub' not in self.hru_definition: # todo
second_shp_geom_list = check_shp_validity(second_shp_geom_list, len(second_shp_reader.shapes()),
name=second_shp_name,
verbosity=self.verbosity)
self.tot_cat_area = get_total_area(first_shp_geom_list)
else:
self.tot_cat_area = get_total_area(second_shp_geom_list)
for s in range(len(third_shp_reader.shapes())):
for j in range(len(second_shp_reader.shapes())):
for lu in range(len(first_shp_reader.shapes())):
intersection = second_shp_geom_list[j].intersection(first_shp_geom_list[lu])
sub = third_shp_geom_list[s]
intersection = sub.intersection(intersection)
sub_code = f'_{third_shp_name}_' + str(find_records(third_shp_file, third_feature, s))
lu_code = find_records(first_shp_file, first_feature, lu)
soil_code = find_records(second_shp_file, second_feature, j)
code = str(year) + sub_code + f'_{second_shp_name}_' + str(soil_code) + f'_{first_shp_name}_' + lu_code
self.hru_geoms[code] = [intersection, second_shp_geom_list[j], first_shp_geom_list[lu]]
hru_paras[code] = {'yearless_key': code[4:]}
self._foo(code, intersection)
# if sum(LuAreaListAcre) > sum(SubAreaListAcre):
# print('Land use area is bigger than subbasin area')
# IntArea_ = [[None] * no_of_subs for _ in range(no_of_lus)]
# IntLuArea = [None] * no_of_lus
# for lu in range(no_of_lus):
# # Int = 0
# IntArea = 0
# for sub_ind in range(no_of_subs):
# Int = LuShpGeomListNew[lu].intersection(SubShpGeomList[sub_ind])
# code, lu_code = get_code(year=year, lu_feature=LuShpGeomListNew[lu], lu_feat_ind=lu,
# sub_feat=SubShpGeomList[sub_ind], sub_feat_ind=sub_ind, _hru_def=_hru_def)
# IntArea += Int.area * M2ToAcre
# anarea = Int.area * M2ToAcre
# IntArea_[lu][sub_ind] = anarea
# print('area of {0} is reduced from {1:10.3f} acres to {2:10.3f}'
# .format(lu_code, LuAreaListAcre[lu], IntArea)) # lu_code/j
# IntLuArea[lu] = IntArea
# print('New area of all landuses is {}'.format(sum(IntLuArea)))
# else:
# print('Land use area is equal to subbasin area')
self.hru_names = list(set(self.hru_names))
return hru_paras, self.hru_geoms
def _foo(self, code, intersection):
hru_name = code[5:]
year = code[0:4]
row_index = pd.to_datetime(year + '0131', format='%Y%m%d', errors='ignore')
self.hru_names.append(hru_name)
self.all_hrus.append(code)
anarea = intersection.area * M2ToAcre
# saving value of area for currnet HRU and for for current year in dictionary
self.area.loc[row_index, hru_name] = anarea
self.area_frac_cat.loc[row_index, hru_name] = anarea / self.tot_cat_area
return
[docs] def plot_hrus(self, year, bbox, _polygon_dict, annotate=False, nrows=3,
ncols=4, name='',
annotate_missing_hru=False)->plt.Figure:
polygon_dict = OrderedDict()
for k, v in _polygon_dict.items():
if str(year) in k[0:4]:
polygon_dict[k] = v
# sorting dictionary based on keys so that it puts same HRU at same place for each year
polygon_dict = get_sorted_dict(polygon_dict)
figure, axs = plt.subplots(nrows, ncols)
if isinstance(bbox, str):
r = shapefile.Reader(bbox)
bbox = r.bbox
r.close()
figure.set_figwidth(27)
figure.set_figheight(12)
axis_l = [item for sublist in list(axs) for item in sublist]
# max_bbox = get_bbox_with_max_area(_polygon_dict)
i = 0
for key, axis in zip(polygon_dict, axis_l):
i += 1
ob = polygon_dict[key][0]
# text0 = key.split('_')[4]+' in '+key.split('_')[1] +' '+ key.split('_')[2]
if ob.type == 'MultiPolygon':
anfang_x = [None] * len(ob)
for s_ob in range(len(ob)):
ob_ = ob[s_ob]
x, y = ob_.exterior.xy
axis.plot(x, y, color=np.random.rand(3, ), alpha=0.7, linewidth=3, solid_capstyle='round', zorder=2)
axis.get_yaxis().set_visible(False)
axis.get_xaxis().set_visible(False)
if bbox is not None:
axis.set_ylim([bbox[1], bbox[3]])
axis.set_xlim([bbox[0], bbox[2]])
anfang_x[s_ob] = x[0]
if annotate:
axis.annotate(key[3:], xy=(0.2, 0.1), xycoords='axes fraction', fontsize=18)
elif ob.type == 'Polygon':
x, y = polygon_dict[key][0].exterior.xy
axis.plot(x, y, color=np.random.rand(3, ), alpha=0.7, linewidth=3, solid_capstyle='round', zorder=2)
axis.get_yaxis().set_visible(False)
axis.get_xaxis().set_visible(False)
if annotate:
axis.annotate(key[3:], xy=(0.2, 0.1), xycoords='axes fraction', fontsize=18)
if bbox is not None:
axis.set_ylim([bbox[1], bbox[3]])
axis.set_xlim([bbox[0], bbox[2]])
# axis.text(x[0], np.mean(y), text0, color='red', fontsize='12')
else: # for empty cases
x, y = np.arange(0, 10), np.arange(0, 10)
axis.plot(x, y, color='w')
text1 = 'no ' + key.split('_')[1] + ' in sub-basin ' + key.split('_')[2]
if annotate_missing_hru:
axis.text(x[0], np.mean(y), text1, color='red', fontsize=16)
axis.get_yaxis().set_visible(False)
axis.get_xaxis().set_visible(False)
figure.suptitle('HRUs for year {}'.format(year), fontsize=22)
# plt.title('HRUs for year {}'.format(year), fontsize=22)
if self.save:
name = 'hrus_{}.png'.format(year) if name is None else name + str(year)
plt.savefig(name, bbox_inches="tight")
if self.show:
plt.show()
return figure
[docs] def plot_as_ts(
self,
marker='o',
ms=8,
min_xticks=None, max_xticks=None,
figsize=None,
**kwargs)->plt.Axes:
"""
parameters
----------
marker :
ms :
marker size as integer
min_xticks : int
minimum ticks on x-axis
max_xticks : int
maximum ticks on x-axis
figsize :
figure size
**kwargs
any keyword arguments for easy_mpl.plot
hru_object.plot_as_ts()"""
figsize = figsize or (12, 6)
legend_kws = dict(fontsize=14, markerscale=2, bbox_to_anchor=(1.1, 0.99))
ax_kws = dict(
xlabel="Time", xlabel_kws=dict(fontsize=18),
ylabel="Area (Acres)", ylabel_kws=dict(fontsize=18),
legend_kws=legend_kws,
title = 'Variation of Area (acre) of HRUs with time'
)
plt.close('all')
_, axis = plt.subplots(figsize=figsize)
for area in self.area:
easy_mpl.plot(self.area[area], marker=marker,
mfc='white', ms=ms, lw=4,
label=area,
ax=axis, ax_kws=ax_kws, show=False, **kwargs)
axis.tick_params(color='lightgrey', labelsize=14, labelcolor='grey')
axis.grid(ls='--', color='lightgrey')
if min_xticks is not None:
assert isinstance(min_xticks, int)
assert isinstance(max_xticks, int)
loc = mdates.AutoDateLocator(minticks=4, maxticks=6)
axis.xaxis.set_major_locator(loc)
fmt = mdates.AutoDateFormatter(loc)
axis.xaxis.set_major_formatter(fmt)
if self.save:
plt.savefig(f'{self.hru_definition}_hru_as_ts.png', dpi=300, bbox_inches="tight")
if self.show:
plt.show()
return axis
[docs] def plot_hru_evolution(self, hru_name, make_gif=False):
"""
plots how the hru evolved during the years
Parameters
----------
hru_name : str,
name of hru to be plotted
make_gif : bool
if True, a gif file will be created from evolution plots
Returns
-------
"""
for yr in self.index.keys():
y = str(yr)[2:] + '_'
hru_name_year = y + hru_name # hru name with year
self.plot_hru(hru_name_year, self.soil_shape)
if make_gif:
gif = GifUtil(folder=os.path.join(os.getcwd(), 'plots'), contains=hru_name)
gif.make_gif()
gif.remove_images()
return
[docs] def make_gif(self):
gif = GifUtil(initials=self.hru_definition, folder=os.path.join(os.getcwd(), 'plots'))
gif.make_gif()
gif.remove_images()
return
[docs] def plot(self, what:str, index=None, show_all_together=True):
assert what in ['landuse', 'soil', 'subbasins', 'slope']
if what == 'landuse':
assert index
shp_file = self.index[index]
else:
shp_file = getattr(self, f'{what}_shape')
return plot_shapefile(shp_file, show_all_together)
[docs] def plot_hru(self, hru_name, bbox=None)->plt.Figure:
"""
plot only one hru from `hru_geoms`.
The value of each key in hru_geoms is a list with three shapes
Examples
--------
>>> self.plot_an_hru(self.hru_names[0], bbox=True)
"""
shape_list = self.hru_geoms[hru_name]
figure, (axis_list) = plt.subplots(3)
figure.set_figheight(14)
if bbox:
r = shapefile.Reader(bbox)
bbox = r.bbox
i = 0
leg = hru_name
for axs, ob in zip(axis_list, shape_list):
i +=1
if i==2: leg = hru_name.split('_')[1:2]
elif i==3: leg = hru_name.split('_')[-1]
if ob.type == 'MultiPolygon':
for s_ob in range(len(ob)):
ob_ = ob[s_ob]
x, y = ob_.exterior.xy
axs.plot(x, y, color=COLORS[i], alpha=0.7, linewidth=3, solid_capstyle='round', zorder=2)
elif ob.type == 'Polygon':
x, y = ob.exterior.xy
axs.plot(x, y, color=COLORS[i], alpha=0.7, linewidth=3, solid_capstyle='round', zorder=2)
axs.set_title(leg, fontsize=14)
if bbox:
axs.set_ylim([bbox[1], bbox[3]])
axs.set_xlim([bbox[0], bbox[2]])
if self.save:
plt.savefig(hru_name + '.png')
if self.show:
plt.show()
return figure
[docs] def draw_pie(
self,
year:int,
n_merge:int=0,
shadow:bool = True,
title:bool=False,
**kwargs
)->tuple:
"""
todo draw nested pie chart for all years
https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.pie.html
Draws a pie chart showing relative area of HRUs for a particular year.
Since the hrus can change with time, selecting one year is based on supposition
that area of hrus remain constant during the whole year.
Parameters
----------
year : int,
the year for which area of hrus will be used.
n_merge :
number of hrus to merge
shadow : bool
title :
**kwargs :
any keyword arguments for `easy_mpl.pie`
Returns
--------
tuple
"""
idx = str(year) + '-01-31'
area_unsort = self.area.loc[idx]
area = area_unsort.sort_values()
merged = area[area.index[0]:area.index[n_merge-1]]
rest = area[area.index[n_merge]:]
if n_merge==0:
assert len(merged) == len(area)
merged_vals= []
merged_labels = []
else:
merged_vals = [sum(merged.values)]
merged_labels = ['{} HRUs'.format(n_merge)]
vals = list(rest.values) + merged_vals
labels = list(rest.keys()) + merged_labels
explode = [0 for _ in range(len(vals))]
explode[-1] = 0.1
labels_n = []
for l in labels:
labels_n.append(l.replace('lu_', ''))
if title:
title = 'Areas of HRUs for year {}'.format(year)
outs = easy_mpl.pie(fractions=vals,
labels=labels_n,
explode=tuple(explode),
shadow=shadow,
ax_kws=dict(title=title), show=False,
**kwargs)
name = f'{len(self.hru_names)}hrus_for_{year}_{self.hru_definition}.png'
if self.save:
plt.savefig(name, dpi=300, bbox_inches="tight")
if self.show:
plt.show()
return outs