Source code for flopy.utils.rasters

import os
import warnings
from typing import Union

import numpy as np

from .geometry import Polygon
from .utl_import import import_optional_dependency

warnings.simplefilter("always", DeprecationWarning)


[docs]class Raster: """ The Raster object is used for cropping, sampling raster values, and re-sampling raster values to grids, and provides methods to plot rasters and histograms of raster digital numbers for visualization and analysis purposes. Parameters ---------- array : np.ndarray a three dimensional array of raster values with dimensions defined by (raster band, nrow, ncol) bands : tuple a tuple of raster bands crs : int, string, rasterio.crs.CRS object either a epsg code, a proj4 string, or a CRS object transform : affine.Affine object affine object, which is used to define geometry nodataval : float raster no data value rio_ds : DatasetReader object rasterIO dataset Reader object Notes ----- Examples -------- >>> from flopy.utils import Raster >>> >>> rio = Raster.load("myraster.tif") """ FLOAT32 = (float, np.float32, np.float64) FLOAT64 = (np.float64,) INT8 = (np.int8, np.uint8) INT16 = (np.int16, np.uint16) INT32 = (int, np.int32, np.uint32, np.uint, np.uintc, np.uint32) INT64 = (np.int64, np.uint64) def __init__( self, array, bands, crs, transform, nodataval, driver="GTiff", rio_ds=None, ): from .geometry import point_in_polygon rasterio = import_optional_dependency("rasterio") from rasterio.crs import CRS self._affine = import_optional_dependency("affine") self._point_in_polygon = point_in_polygon self._array = array self._bands = bands meta = {"driver": driver, "nodata": nodataval} # create metadata dictionary if array.dtype in Raster.FLOAT32: dtype = "float32" elif array.dtype in Raster.FLOAT64: dtype = "float64" elif array.dtype in Raster.INT8: dtype = "int8" elif array.dtype in Raster.INT16: dtype = "int16" elif array.dtype in Raster.INT32: dtype = "int32" elif array.dtype in Raster.INT64: dtype = "int64" else: raise TypeError("dtype cannot be determined from Raster") meta["dtype"] = dtype if isinstance(crs, CRS): pass elif isinstance(crs, int): crs = CRS.from_epsg(crs) elif isinstance(crs, str): crs = CRS.from_string(crs) else: TypeError("crs type not understood, provide an epsg or proj4") meta["crs"] = crs count, height, width = array.shape meta["count"] = count meta["height"] = height meta["width"] = width if not isinstance(transform, self._affine.Affine): raise TypeError("Transform must be defined by an Affine object") meta["transform"] = transform self._meta = meta self._dataset = None self.__arr_dict = { self._bands[b]: arr for b, arr in enumerate(self._array) } self.__xcenters = None self.__ycenters = None if isinstance(rio_ds, rasterio.io.DatasetReader): self._dataset = rio_ds @property def bounds(self): """ Returns a tuple of xmin, xmax, ymin, ymax boundaries """ height = self._meta["height"] width = self._meta["width"] transform = self._meta["transform"] xmin = transform[2] ymax = transform[5] xmax, ymin = transform * (width, height) return xmin, xmax, ymin, ymax @property def bands(self): """ Returns a tuple of raster bands """ if self._dataset is None: return tuple(self._bands) else: return self._dataset.indexes @property def nodatavals(self): """ Returns a Tuple of values used to define no data """ if self._dataset is None: if isinstance(self._meta["nodata"], list): nodata = tuple(self._meta["nodata"]) elif isinstance(self._meta["nodata"], tuple): nodata = self._meta["nodata"] else: nodata = (self._meta["nodata"],) return nodata else: return self._dataset.nodatavals @property def xcenters(self): """ Returns a np.ndarray of raster x cell centers """ if self.__xcenters is None: self.__xycenters() return self.__xcenters @property def ycenters(self): """ Returns a np.ndarray of raster y cell centers """ if self.__ycenters is None: self.__xycenters() return self.__ycenters def __xycenters(self): """ Method to create np.arrays of the xy-cell centers in the raster object """ arr = None for _, arr in self.__arr_dict.items(): break if arr is None: raise AssertionError("No array data was found") ylen, xlen = arr.shape # assume that transform is an unrotated plane # if transform indicates a rotated plane additional # processing will need to be added in this portion of the code xd = abs(self._meta["transform"][0]) yd = abs(self._meta["transform"][4]) x0, x1, y0, y1 = self.bounds # adjust bounds to centroids x0 += xd / 2.0 x1 -= xd / 2.0 y0 += yd / 2.0 y1 -= yd / 2.0 x = np.linspace(x0, x1, xlen) y = np.linspace(y1, y0, ylen) self.__xcenters, self.__ycenters = np.meshgrid(x, y)
[docs] def sample_point(self, *point, band=1): """ Method to get nearest raster value at a user provided point Parameters ---------- *point : point geometry representation accepted data types: x, y values : ex. sample_point(1, 3, band=1) tuple of x, y: ex sample_point((1, 3), band=1) shapely.geometry.Point geojson.Point flopy.geometry.Point band : int raster band to re-sample Returns ------- value : float """ from .geospatial_utils import GeoSpatialUtil if isinstance(point[0], (tuple, list, np.ndarray)): point = point[0] geom = GeoSpatialUtil(point, shapetype="Point") x, y = geom.points # 1: get grid. rxc = self.xcenters ryc = self.ycenters # 2: apply distance equation xt = (rxc - x) ** 2 yt = (ryc - y) ** 2 dist = np.sqrt(xt + yt) # 3: find indices of minimum distance md = np.where(dist == np.nanmin(dist)) # 4: sample the array and average if necessary vals = [] arr = self.get_array(band) for ix, i in enumerate(md[0]): j = md[1][ix] vals.append(arr[i, j]) value = np.nanmean(vals) return value
[docs] def sample_polygon(self, polygon, band, invert=False, **kwargs): """ Method to get an unordered list of raster values that are located within a arbitrary polygon Parameters ---------- polygon : list, geojson, shapely.geometry, shapefile.Shape sample_polygon method accepts any of these geometries: a list of (x, y) points, ex. [(x1, y1), ...] geojson Polygon object shapely Polygon object shapefile Polygon shape flopy Polygon shape band : int raster band to re-sample invert : bool Default value is False. If invert is True then the area inside the shapes will be masked out Returns ------- np.ndarray of unordered raster values """ if band not in self.bands: err = ( "Band number is not recognized, use self.bands for a list " "of raster bands" ) raise AssertionError(err) if self._dataset is not None: arr_dict = self._sample_rio_dataset(polygon, invert)[0] for b, arr in arr_dict.items(): for val in self.nodatavals: t = arr[arr != val] arr_dict[b] = t else: mask = self._intersection(polygon, invert, **kwargs) arr_dict = {} for b, arr in self.__arr_dict.items(): t = arr[mask] arr_dict[b] = t return arr_dict[band]
[docs] def resample_to_grid( self, modelgrid, band, method="nearest", multithread=False, thread_pool=2, extrapolate_edges=False, ): """ Method to resample the raster data to a user supplied grid of x, y coordinates. x, y coordinate arrays should correspond to grid vertices Parameters ---------- modelgrid : flopy.Grid object model grid to sample data from band : int raster band to re-sample method : str resampling methods ``linear`` for bi-linear interpolation ``nearest`` for nearest neighbor ``cubic`` for bi-cubic interpolation ``mean`` for mean sampling ``median`` for median sampling ``min`` for minimum sampling ``max`` for maximum sampling `'mode'` for majority sampling multithread : bool DEPRECATED boolean flag indicating if multithreading should be used with the ``mean`` and ``median`` sampling methods thread_pool : int DEPRECATED number of threads to use for mean and median sampling extrapolate_edges : bool boolean flag indicating if areas without data should be filled using the ``nearest`` interpolation method. This option has no effect when using the ``nearest`` interpolation method. Returns ------- np.array """ if multithread: warnings.warn( "multithread option has been deprecated and will be removed " "in flopy version 3.3.8" ) import_optional_dependency("scipy") rasterstats = import_optional_dependency("rasterstats") from scipy.interpolate import griddata method = method.lower() if method in ("linear", "nearest", "cubic"): xc = modelgrid.xcellcenters yc = modelgrid.ycellcenters data_shape = xc.shape xc = xc.flatten() yc = yc.flatten() # step 1: create grid from raster bounds rxc = self.xcenters ryc = self.ycenters # step 2: flatten grid rxc = rxc.flatten() ryc = ryc.flatten() # step 3: get array if method == "cubic": arr = self.get_array(band, masked=False) else: arr = self.get_array(band, masked=True) arr = arr.flatten() # step 3: use griddata interpolation to snap to grid data = griddata( (rxc, ryc), arr, (xc, yc), method=method, ) elif method in ("median", "mean", "min", "max", "mode"): # these methods are slow and could use speed ups data_shape = modelgrid.xcellcenters.shape if method == "mode": method = "majority" xv, yv = modelgrid.cross_section_vertices polygons = [list(zip(x, yv[ix])) for ix, x in enumerate(xv)] polygons = [Polygon(p) for p in polygons] rstr = self.get_array(band, masked=False) affine = self._meta["transform"] nodata = self.nodatavals[0] zs = rasterstats.zonal_stats( polygons, rstr, affine=affine, stats=[method], nodata=nodata ) data = np.array( [z[method] if z[method] is not None else np.nan for z in zs] ) else: raise TypeError(f"{method} method not supported") if extrapolate_edges and method != "nearest": xc = modelgrid.xcellcenters yc = modelgrid.ycellcenters xc = xc.flatten() yc = yc.flatten() # step 1: create grid from raster bounds rxc = self.xcenters ryc = self.ycenters # step 2: flatten grid rxc = rxc.flatten() ryc = ryc.flatten() arr = self.get_array(band, masked=True).flatten() # filter out nan values from the original dataset if np.isnan(np.sum(arr)): idx = np.isfinite(arr) rxc = rxc[idx] ryc = ryc[idx] arr = arr[idx] extrapolate = griddata( (rxc, ryc), arr, (xc, yc), method="nearest", ) data = np.where(np.isnan(data), extrapolate, data) # step 4: return grid to user in shape provided data.shape = data_shape # step 5: re-apply nodata values data[np.isnan(data)] = self.nodatavals[0] return data
[docs] def crop(self, polygon, invert=False): """ Method to crop a new raster object from the current raster object Parameters ---------- polygon : list, geojson, shapely.geometry, shapefile.Shape crop method accepts any of these geometries: a list of (x, y) points, ex. [(x1, y1), ...] geojson Polygon object shapely Polygon object shapefile Polygon shape flopy Polygon shape invert : bool Default value is False. If invert is True then the area inside the shapes will be masked out """ if self._dataset is not None: arr_dict, rstr_crp_meta = self._sample_rio_dataset(polygon, invert) self.__arr_dict = arr_dict self._meta = rstr_crp_meta self._dataset = None self.__xcenters = None self.__ycenters = None else: mask = self._intersection(polygon, invert) xc = self.xcenters yc = self.ycenters # step 4: find bounding box xba = np.copy(xc) yba = np.copy(yc) xba[~mask] = np.nan yba[~mask] = np.nan xmin = np.nanmin(xba) xmax = np.nanmax(xba) ymin = np.nanmin(yba) ymax = np.nanmax(yba) bbox = [(xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin)] # step 5: use bounding box to crop array xind = [] yind = [] for pt in bbox: xt = (pt[0] - xc) ** 2 yt = (pt[1] - yc) ** 2 hypot = np.sqrt(xt + yt) ind = np.where(hypot == np.min(hypot)) yind.append(ind[0][0]) xind.append(ind[1][0]) xmii = np.min(xind) xmai = np.max(xind) ymii = np.min(yind) ymai = np.max(yind) crp_mask = mask[ymii : ymai + 1, xmii : xmai + 1] nodata = self._meta["nodata"] if not isinstance(nodata, float) and not isinstance(nodata, int): try: nodata = nodata[0] except (IndexError, TypeError): nodata = -1.0e38 self._meta["nodata"] = nodata arr_dict = {} for band, arr in self.__arr_dict.items(): t = arr[ymii : ymai + 1, xmii : xmai + 1] t[~crp_mask] = nodata arr_dict[band] = t self.__arr_dict = arr_dict # adjust xmin, ymax back to appropriate grid locations xd = abs(self._meta["transform"][0]) yd = abs(self._meta["transform"][4]) xmin -= xd / 2.0 ymax += yd / 2.0 # step 6: update metadata including a new Affine self._meta["height"] = crp_mask.shape[0] self._meta["width"] = crp_mask.shape[1] transform = self._meta["transform"] self._meta["transform"] = self._affine.Affine( transform[0], transform[1], xmin, transform[3], transform[4], ymax, ) self.__xcenters = None self.__ycenters = None
def _sample_rio_dataset(self, polygon, invert): """ Internal method to sample a rasterIO dataset using rasterIO built ins Parameters ---------- polygon : list, geojson, shapely.geometry, shapefile.Shape _sample_rio_dataset method accepts any of these geometries: a list of (x, y) points, ex. [(x1, y1), ...] geojson Polygon object shapely Polygon object shapefile Polygon shape flopy Polygon shape invert : bool Default value is False. If invert is True then the area inside the shapes will be masked out Returns ------- tuple : (arr_dict, raster_crp_meta) """ import_optional_dependency("rasterio") from rasterio.mask import mask from .geospatial_utils import GeoSpatialUtil if isinstance(polygon, (list, tuple, np.ndarray)): polygon = [polygon] geom = GeoSpatialUtil(polygon, shapetype="Polygon") shapes = [geom] rstr_crp, rstr_crp_affine = mask( self._dataset, shapes, crop=True, invert=invert ) rstr_crp_meta = self._dataset.meta.copy() rstr_crp_meta.update( { "driver": "GTiff", "height": rstr_crp.shape[1], "width": rstr_crp.shape[2], "transform": rstr_crp_affine, } ) arr_dict = {self.bands[b]: arr for b, arr in enumerate(rstr_crp)} return arr_dict, rstr_crp_meta def _intersection(self, polygon, invert, **kwargs): """ Internal method to create an intersection mask, used for cropping arrays and sampling arrays. Parameters ---------- polygon : list, geojson, shapely.geometry, shapefile.Shape _intersection method accepts any of these geometries: a list of (x, y) points, ex. [(x1, y1), ...] geojson Polygon object shapely Polygon object shapefile Polygon shape flopy Polygon shape invert : bool Default value is False. If invert is True then the area inside the shapes will be masked out Returns ------- mask : np.ndarray (dtype = bool) """ # the convert kwarg is to speed up the resample_to_grid method # which already provides the proper datatype to _intersect() convert = kwargs.pop("convert", True) if convert: from .geospatial_utils import GeoSpatialUtil if isinstance(polygon, (list, tuple, np.ndarray)): polygon = [polygon] geom = GeoSpatialUtil(polygon, shapetype="Polygon") polygon = list(geom.points[0]) # step 2: create a grid of centoids xc = self.xcenters yc = self.ycenters # step 3: do intersection mask = self._point_in_polygon(xc, yc, polygon) if invert: mask = np.invert(mask) return mask
[docs] def get_array(self, band, masked=True): """ Method to get a numpy array corresponding to the provided raster band. Nodata vals are set to np.nan Parameters ---------- band : int band number from the raster masked : bool determines if nodatavals will be returned as np.nan to the user Returns ------- np.ndarray """ if band not in self.bands: raise ValueError("Band {} not a valid value") if self._dataset is None: array = np.copy(self.__arr_dict[band]) else: array = self._dataset.read(band) if masked: for v in self.nodatavals: array = array.astype(float) array[array == v] = np.nan return array
[docs] def write(self, name): """ Method to write raster data to a .tif file Parameters ---------- name : str output raster .tif file name """ rasterio = import_optional_dependency("rasterio") if not name.endswith(".tif"): name += ".tif" with rasterio.open(name, "w", **self._meta) as foo: for band, arr in self.__arr_dict.items(): foo.write(arr, band)
[docs] @staticmethod def load(raster: Union[str, os.PathLike]): """ Static method to load a raster file into the raster object Parameters ---------- raster : str or PathLike The path to the raster file Returns ------- A Raster object """ rasterio = import_optional_dependency("rasterio") dataset = rasterio.open(raster) array = dataset.read() bands = dataset.indexes meta = dataset.meta return Raster( array, bands, meta["crs"], meta["transform"], meta["nodata"], meta["driver"], )
[docs] def plot(self, ax=None, contour=False, **kwargs): """ Method to plot raster layers or contours. Parameters ---------- ax : matplotlib.pyplot.axes optional matplotlib axes for plotting contour : bool flag to indicate creation of contour plot **kwargs : matplotlib keyword arguments see matplotlib documentation for valid arguments for plot and contour. Returns ------- ax : matplotlib.pyplot.axes """ import_optional_dependency("rasterio") from rasterio.plot import show if self._dataset is not None: ax = show( self._dataset, ax=ax, contour=contour, **kwargs, ) else: d0 = len(self.__arr_dict) d1, d2 = None, None for _, arr in self.__arr_dict.items(): d1, d2 = arr.shape if d1 is None: raise AssertionError("No plottable arrays found") data = np.zeros((d0, d1, d2), dtype=float) i = 0 for _, arr in sorted(self.__arr_dict.items()): data[i, :, :] = arr i += 1 data = np.ma.masked_where(data == self.nodatavals, data) ax = show( data, ax=ax, contour=contour, transform=self._meta["transform"], **kwargs, ) return ax
[docs] def histogram(self, ax=None, **kwargs): """ Method to plot a histogram of digital numbers Parameters ---------- ax : matplotlib.pyplot.axes optional matplotlib axes for plotting **kwargs : matplotlib keyword arguments see matplotlib documentation for valid arguments for histogram Returns ------- ax : matplotlib.pyplot.axes """ import_optional_dependency("rasterio") from rasterio.plot import show_hist if "alpha" not in kwargs: kwargs["alpha"] = 0.3 if self._dataset is not None: ax = show_hist(self._dataset, ax=ax, **kwargs) else: d0 = len(self.__arr_dict) d1, d2 = None, None for _, arr in self.__arr_dict.items(): d1, d2 = arr.shape if d1 is None: raise AssertionError("No plottable arrays found") data = np.zeros((d0, d1, d2), dtype=float) i = 0 for _, arr in sorted(self.__arr_dict.items()): data[i, :, :] = arr i += 1 data = np.ma.masked_where(data == self.nodatavals, data) ax = show_hist(data, ax=ax, **kwargs) return ax