Source code for flopy.utils.gridintersect

import warnings

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.collections import PatchCollection
from matplotlib.patches import PathPatch
from matplotlib.path import Path
from numpy.lib import recfunctions as nprecfns
from pandas import NA, DataFrame

from .geospatial_utils import GeoSpatialUtil
from .utl_import import import_optional_dependency

shapely = import_optional_dependency("shapely", errors="silent")


[docs]def parse_shapely_ix_result(collection, ix_result, shptyps=None): """Recursive function for parsing shapely intersection results. Returns a list of shapely shapes matching shptyps. Parameters ---------- collection : list state variable for storing result, generally an empty list ix_result : shapely.geometry type any shapely intersection result shptyps : str, list of str, or None, optional if None (default), return all types of shapes. if str, return shapes of that type, if list of str, return all types in list Returns ------- list list containing shapely geometries of type shptyps """ # convert shptyps to list if needed if isinstance(shptyps, str): shptyps = [shptyps] elif shptyps is None: shptyps = [None] # if empty if ix_result.is_empty: return collection # base case: geom_type is partial or exact match to shptyp elif ix_result.geom_type in shptyps: collection.append(ix_result) return collection # recursion for collections elif hasattr(ix_result, "geoms"): for ishp in ix_result.geoms: parse_shapely_ix_result(collection, ishp, shptyps=shptyps) # if collecting all types elif shptyps[0] is None: return collection.append(ix_result) return collection
[docs]class GridIntersect: """Class for intersecting shapely geometries with MODFLOW grids. Notes ----- - Supports structured and vertex grids. No support for unstructured grids. If grid is layered-unstructured, creating a single layer vertex-grid can be used as a workaround. - For linestrings and polygons only 2D intersection operations are supported. - Point intersections can optionally return layer position based on the z-coordinate. - The STR-tree can be disabled, but this is generally not recommended as some functions will not work without it. - The STR-tree query is based on the bounding box of the shape or collection, if the bounding box of the shape covers nearly the entire grid, the query won't be able to limit the search space much, resulting in slower performance. Therefore, it can sometimes be faster to intersect each individual shape in a collection than it is to intersect with the whole collection at once. """ def __init__(self, mfgrid, rtree=True, local=False): """Intersect shapes (Point, Linestring, Polygon) with a modflow grid. Parameters ---------- mfgrid : flopy modflowgrid MODFLOW grid as implemented in flopy rtree : bool, optional build an STR-Tree if True (default). If False no STR-tree is built, but intersects will filter and loop through candidate model gridcells (which is generally slower and not recommended). local : bool, optional use local model coordinates from model grid to build grid geometries, default is False and uses real-world coordinates (with offset and rotation). """ import_optional_dependency( "shapely", error_message="GridIntersect requires shapely" ) self.mfgrid = mfgrid self.local = local self.rtree = rtree # build arrays of geoms and cellids if self.mfgrid.grid_type == "structured": self.geoms, self.cellids = self._rect_grid_to_geoms_cellids() elif self.mfgrid.grid_type == "vertex": self.geoms, self.cellids = self._vtx_grid_to_geoms_cellids() else: raise NotImplementedError( f"Grid type {self.mfgrid.grid_type} not supported" ) # build STR-tree if specified if self.rtree: strtree = import_optional_dependency( "shapely.strtree", error_message="STRTree requires shapely", ) self.strtree = strtree.STRtree(self.geoms) def _parse_input_shape(self, shp, shapetype=None): """Internal method to parse input shape. Allows numpy arrays containing shapely geometries; otherwise delegates to ``GeoSpatialUtil`` for parsing. Parameters ---------- shp : shapely geometry, geojson object, shapefile.Shape, np.ndarray, or a FloPy geometry object Shape to intersect with the grid. If an ``np.ndarray`` is provided, all elements must be shapely geometries of the same type. shapetype : str, optional Type of shape ("point", "linestring", "polygon" or their multi-variants). Used by ``GeoSpatialUtil`` if shp is passed as a list of vertices. Default is None. Returns ------- tuple (geom, gtype) where geom is a shapely geometry or an array of shapely geometries, and gtype is the corresponding shapely GeometryType (or a matching string value). """ if isinstance(shp, np.ndarray) and isinstance(shp[0], shapely.Geometry): shapetypes = shapely.get_type_id(shp) assert len(np.unique(shapetypes)) == 1, ( "If passing an array of shapely geometries, all geometries must be " "of the same type." ) shapetype = shapely.GeometryType(shapetypes[0]) else: gu = GeoSpatialUtil(shp, shapetype=shapetype) shp = gu.shapely shapetype = gu.shapetype return shp, shapetype
[docs] def intersect( self, shp, shapetype=None, sort_by_cellid=True, return_all_intersections=False, contains_centroid=False, min_area_fraction=None, handle_z=False, geo_dataframe=None, ): """Intersect a shape with a model grid. Parameters ---------- shp : shapely.geometry, geojson object, shapefile.Shape, or flopy geometry object shapetype : str, optional type of shape (i.e. "point", "linestring", "polygon" or their multi-variants), used by GeoSpatialUtil if shp is passed as a list of vertices, default is None sort_by_cellid : bool sort results by cellid, ensures cell with lowest cellid is returned for boundary cases when using vertex methods, default is True return_all_intersections : bool, optional if True, return multiple intersection results for points or linestrings on grid cell boundaries (e.g. returns 2 intersection results if a point lies on the boundary between two grid cells). The default is False. Only used if shape type is "point" or "linestring". contains_centroid : bool, optional if True, only store intersection result if cell centroid is contained within intersection shape, only used if shape type is "polygon" min_area_fraction : float, optional float defining minimum intersection area threshold, if intersection area is smaller than min_frac_area * cell_area, do not store intersection result, only used if shape type is "polygon" handle_z : bool, optional Method for handling z-dimension in intersection results for point intersections. Default is False which ignores z-dimension. If True returns the layer index for each point. Points below/above the grid are returned as masked values or pd.NA if returned as dataframe. geo_dataframe : bool, optional If True, return a geopandas ``GeoDataFrame``, otherwise return a numpy recarray. Default will be set to True in the future; currently, None triggers a deprecation warning and returns a recarray (legacy behavior). Returns ------- numpy.recarray or geopandas.GeoDataFrame Intersection results. Result contains the following columns: - cellid: cellid of intersected grid cell - cellids: legacy column containing (row, col) tuples for structured grids, or cellids for vertex grids (deprecated, use "cellid" column instead) - ixshapes or geometry: shapely geometry of the intersection result And optionally: - layer: layer index of for points with z-dimension when handle_z is True - row: row index of intersected grid cell, for structured grids - col: column index of intersected grid cell, for structured grids - lengths: length of intersection results for linestrings - areas: areas of intersection results for polygons """ shp, shapetype = self._parse_input_shape(shp, shapetype=shapetype) # if array, only accept length 1 if isinstance(shp, np.ndarray) and len(shp) > 1: raise ValueError( "intersect() only accepts arrays containing one " f"{shapetype.name.lower()} at a time." ) if shapetype in { "Point", "MultiPoint", shapely.GeometryType.POINT, shapely.GeometryType.MULTIPOINT, }: rec = self._intersect_point( shp, sort_by_cellid=sort_by_cellid, return_all_intersections=return_all_intersections, ) # handle elevation data for points # if handle_z is True # if shp has z information # if there are intersection results if handle_z and shapely.has_z(shp).any() and len(rec.cellid) > 0: laypos = self.get_layer_from_z(shp, rec.cellid) # copy data to new array to include layer position rec = nprecfns.append_fields( rec, names="layer", data=laypos, dtypes=int, usemask=False, asrecarray=True, ) rec = np.ma.masked_where(rec.layer < 0, rec) elif shapetype in { "LineString", "MultiLineString", shapely.GeometryType.LINESTRING, shapely.GeometryType.MULTILINESTRING, shapely.GeometryType.LINEARRING, }: rec = self._intersect_linestring( shp, sort_by_cellid=sort_by_cellid, return_all_intersections=return_all_intersections, ) elif shapetype in { "Polygon", "MultiPolygon", shapely.GeometryType.POLYGON, shapely.GeometryType.MULTIPOLYGON, }: rec = self._intersect_polygon( shp, sort_by_cellid=sort_by_cellid, contains_centroid=contains_centroid, min_area_fraction=min_area_fraction, ) else: raise TypeError(f"Shapetype {shapetype} is not supported") if geo_dataframe is None: warnings.warn( "In the future this function will return a GeoDataFrame by default. " "Set geo_dataframe=True to adopt future behavior and silence this " "warning. Set geo_dataframe=False to silence this warning and maintain " "old behavior", DeprecationWarning, ) if geo_dataframe: gpd = import_optional_dependency("geopandas") if gpd is None: raise ModuleNotFoundError( "GeoDataFrame output requires geopandas to be installed." ) gdf = ( gpd.GeoDataFrame(rec) .rename(columns={"ixshapes": "geometry"}) .set_geometry("geometry") .replace(999999, NA) ) if self.mfgrid.crs is not None: gdf = gdf.set_crs(self.mfgrid.crs) return gdf return rec
def _rect_grid_to_geoms_cellids(self): """internal method, return shapely polygons and cellids for structured grid cells. Returns ------- geoms : array_like array of shapely Polygons cellids : array_like array of cellids """ shapely = import_optional_dependency("shapely") nrow = self.mfgrid.nrow ncol = self.mfgrid.ncol ncells = nrow * ncol cellids = np.arange(ncells) if self.local: xvertices, yvertices = np.meshgrid(*self.mfgrid.xyedges) else: xvertices = self.mfgrid.xvertices yvertices = self.mfgrid.yvertices # arrays of coordinates for rectangle cells I, J = np.ogrid[0:nrow, 0:ncol] xverts = np.stack( [ xvertices[I, J], xvertices[I, J + 1], xvertices[I + 1, J + 1], xvertices[I + 1, J], ] ).transpose((1, 2, 0)) yverts = np.stack( [ yvertices[I, J], yvertices[I, J + 1], yvertices[I + 1, J + 1], yvertices[I + 1, J], ] ).transpose((1, 2, 0)) # use array-based methods for speed geoms = shapely.polygons( shapely.linearrings( xverts.flatten(), y=yverts.flatten(), indices=np.repeat(cellids, 4), ) ) return geoms, cellids def _usg_grid_to_geoms_cellids(self): """internal method, return shapely polygons and cellids for unstructured grids. Returns ------- geoms : array_like array of shapely Polygons cellids : array_like array of cellids """ raise NotImplementedError() def _vtx_grid_to_geoms_cellids(self): """internal method, return shapely polygons and cellids for vertex grids. Returns ------- geoms : array_like array of shapely Polygons cellids : array_like array of cellids """ shapely = import_optional_dependency("shapely") if self.local: geoms = [ shapely.polygons( list( zip( *self.mfgrid.get_local_coords( *np.array(self.mfgrid.get_cell_vertices(node)).T ) ) ) ) for node in range(self.mfgrid.ncpl) ] else: geoms = [ shapely.polygons(self.mfgrid.get_cell_vertices(node)) for node in range(self.mfgrid.ncpl) ] return np.array(geoms), np.arange(self.mfgrid.ncpl)
[docs] def query_grid(self, shp, predicate=None): """Perform spatial query on the grid using a shapely geometry. If no spatial query is possible (``rtree=False``), returns all grid cells. Parameters ---------- shp : shapely.geometry shapely geometry predicate : str, optional spatial predicate to use for query, default is None. See documentation of self.strtree.query for options. Returns ------- numpy.ndarray For a single geometry, a 1-D array of cellids. For an array of geometries, a 2xN array where the first row is the input geometry index (``shp_id``) and the second row contains the matching cellids. """ if self.rtree: result = self.strtree.query(shp, predicate=predicate) else: # no spatial query result = self.cellids return result
[docs] def filter_query_result(self, shp, cellids): """Filter a query result to cells that truly intersect a shape. Used to (further) reduce the query result to cells that intersect with shp. This is primarily used when ``rtree=False``. Parameters ---------- shp : shapely.geometry shapely geometry that is prepared and used to filter query result cellids : iterable iterable of cellids, query result Returns ------- numpy.ndarray Array of cellids that intersect shp. """ # flipped arguments to be consistent with all other methods in class msg = ( "the cellids and shp arguments were flipped, please" " pass them as filter_query_result(shp, cellids)" ) if isinstance(cellids, np.ndarray): if isinstance(cellids[0], shapely.Geometry): warnings.warn(msg) cellids, shp = shp, cellids elif isinstance(cellids, shapely.Geometry): warnings.warn(msg) cellids, shp = shp, cellids # get only gridcells that intersect if not shapely.is_prepared(shp).all(): shapely.prepare(shp) qcellids = cellids[shapely.intersects(self.geoms[cellids], shp)] return qcellids
def _intersect_point_shapely(self, *args, **kwargs): """Deprecated method, use _intersect_point instead.""" warnings.warn( "_intersect_point_shapely is deprecated, use _intersect_point instead.", DeprecationWarning, ) return self._intersect_point(*args, **kwargs) def _intersect_point( self, shp, sort_by_cellid=True, return_all_intersections=False, ): """Intersect a Point or MultiPoint with the grid. Parameters ---------- shp : shapely Point or MultiPoint (or array with a single point) Geometry to intersect. sort_by_cellid : bool, optional If True (default), sort candidate cells by cellid return_all_intersections : bool, optional If True, return multiple intersection results for points on cell boundaries. Default is False. Returns ------- numpy.recarray Intersection results. Result contains the following columns: - cellid: cellid of intersected grid cell - cellids: legacy column containing (row, col) tuples for structured grids, or cellids for vertex grids (deprecated, use "cellid" column instead) - ixshapes or geometry: shapely geometry of the intersection result And optionally: - layer: layer index of for points with z-dimension when handle_z is True - row: row index of intersected grid cell, for structured grids - col: column index of intersected grid cell, for structured grids """ r = self.intersects(shp, dataframe=False) qcellids = r.cellid[r.cellid >= 0] if sort_by_cellid: qcellids = np.sort(qcellids) ixresult = shapely.intersection(shp, self.geoms[qcellids]) # discard empty intersection results mask_empty = shapely.is_empty(ixresult) # keep only Point and MultiPoint mask_type = np.isin( shapely.get_type_id(ixresult), [shapely.GeometryType.POINT, shapely.GeometryType.MULTIPOINT], ) ixresult = ixresult[~mask_empty & mask_type] qcellids = qcellids[~mask_empty & mask_type] if not return_all_intersections: keep_cid = [] keep_pts = [] parsed = [] for ishp, cid in zip(ixresult, qcellids): points = [] for pnt in shapely.get_parts(ishp): next_pnt = next(iter(pnt.coords)) if next_pnt not in parsed: points.append(pnt) parsed.append(next_pnt) if len(points) > 1: keep_pts.append(shapely.MultiPoint(points)) keep_cid.append(cid) elif len(points) == 1: keep_pts.append(points[0]) keep_cid.append(cid) else: keep_pts = ixresult keep_cid = qcellids if self.mfgrid.grid_type == "structured": names = ["cellids", "cellid", "row", "col", "ixshapes"] formats = ["O", int, int, int, "O"] else: names = ["cellids", "cellid", "ixshapes"] formats = [int, int, "O"] rec = np.recarray(len(keep_pts), names=names, formats=formats) rec.cellid = self.cellids[keep_cid] rec.ixshapes = keep_pts # if structured calculate (i, j) cell address if self.mfgrid.grid_type == "structured": rec.cellids, rec.row, rec.col = self._cellid_to_rowcol( self.cellids[keep_cid] ) else: rec.cellids = rec.cellid # NOTE: legacy support for cellids column return rec def _intersect_linestring_shapely(self, *args, **kwargs): """Deprecated method, use _intersect_linestring instead.""" warnings.warn( "_intersect_linestring_shapely is deprecated, " "use _intersect_linestring instead.", DeprecationWarning, ) return self._intersect_linestring(*args, **kwargs) def _intersect_linestring( self, shp, sort_by_cellid=True, return_all_intersections=False, ): """Intersect a LineString or MultiLineString with the grid. Parameters ---------- shp : shapely LineString or MultiLineString Geometry to intersect. sort_by_cellid : bool, optional If True (default), sort candidate cells by cellid before intersecting. return_all_intersections : bool, optional If True, keep overlapping boundary segments as separate results. Default is False. Returns ------- numpy.recarray Intersection results. Result contains the following columns: - cellid: cellid of intersected grid cell - cellids: legacy column containing (row, col) tuples for structured grids, or cellids for vertex grids (deprecated, use "cellid" column instead) - ixshapes or geometry: shapely geometry of the intersection result And optionally: - row: row index of intersected grid cell, for structured grids - col: column index of intersected grid cell, for structured grids - lengths: length of intersection results for linestrings """ if self.rtree: qcellids = self.strtree.query(shp, predicate="intersects") else: qcellids = self.filter_query_result(shp, self.cellids) if sort_by_cellid: qcellids = np.sort(qcellids) ixresult = shapely.intersection(shp, self.geoms[qcellids]) # discard empty intersection results mask_empty = shapely.is_empty(ixresult) # keep only Linestring and MultiLineString geomtype_ids = shapely.get_type_id(ixresult) all_ids = [ shapely.GeometryType.LINESTRING, shapely.GeometryType.MULTILINESTRING, shapely.GeometryType.GEOMETRYCOLLECTION, ] line_ids = [ shapely.GeometryType.LINESTRING, shapely.GeometryType.MULTILINESTRING, ] mask_type = np.isin(geomtype_ids, all_ids) ixresult = ixresult[~mask_empty & mask_type] qcellids = qcellids[~mask_empty & mask_type] # parse geometry collections (i.e. when part of linestring touches a cell edge, # resulting in a point intersection result) if shapely.GeometryType.GEOMETRYCOLLECTION in geomtype_ids: def parse_linestrings_in_geom_collection(gc): parts = shapely.get_parts(gc) parts = parts[np.isin(shapely.get_type_id(parts), line_ids)] if len(parts) > 1: p = shapely.multilinestrings(parts) elif len(parts) == 0: p = shapely.LineString() else: p = parts[0] return p mask_gc = ( geomtype_ids[~mask_empty & mask_type] == shapely.GeometryType.GEOMETRYCOLLECTION ) # NOTE: not working for multiple geometry collections, result is reduced # to a single multilinestring, which causes doubles in the result # ixresult[mask_gc] = np.apply_along_axis( # parse_linestrings_in_geom_collection, # axis=0, # arr=ixresult[mask_gc], # ) ixresult[mask_gc] = [ parse_linestrings_in_geom_collection(gc) for gc in ixresult[mask_gc] ] if not return_all_intersections: # intersection with grid cell boundaries ixbounds = shapely.intersection( shp, shapely.get_exterior_ring(self.geoms[qcellids]) ) mask_bnds_empty = shapely.is_empty(ixbounds) mask_bnds_type = np.isin(shapely.get_type_id(ixbounds), all_ids) # get ids of boundary intersections idxs = np.nonzero(~mask_bnds_empty & mask_bnds_type)[0] # loop through results, starting with highest cellid jdxs = idxs[::-1] for jx, i in enumerate(jdxs): # calculate intersection with results w potential boundary # intersections isect = ixresult[i].intersection(ixresult[idxs]) # masks to obtain overlapping intersection result mask_self = idxs == i # select not self mask_bnds_empty = shapely.is_empty(isect) # select boundary ix result mask_overlap = np.isin(shapely.get_type_id(isect), all_ids) # calculate difference between self and overlapping result diff = shapely.difference( ixresult[i], isect[mask_overlap & ~mask_self & ~mask_bnds_empty], ) # update intersection result if necessary if len(diff) > 0: ixresult[jdxs[jx]] = diff[0] # mask out empty results mask_keep = ~shapely.is_empty(ixresult) ixresult = ixresult[mask_keep] qcellids = qcellids[mask_keep] if self.mfgrid.grid_type == "structured": names = ["cellids", "cellid", "row", "col", "ixshapes", "lengths"] formats = ["O", int, int, int, "O", float] else: names = ["cellids", "cellid", "ixshapes", "lengths"] formats = ["O", int, "O", float] rec = np.recarray(len(ixresult), names=names, formats=formats) rec.cellid = self.cellids[qcellids] rec.ixshapes = ixresult rec.lengths = shapely.length(ixresult) # if structured grid calculate (i, j) cell address if self.mfgrid.grid_type == "structured": rec.cellids, rec.row, rec.col = self._cellid_to_rowcol( self.cellids[qcellids] ) else: rec.cellids = rec.cellid # NOTE: legacy support for cellids column return rec def _intersect_polygon_shapely(self, *args, **kwargs): """Deprecated method, use _intersect_polygon instead.""" import warnings warnings.warn( "_intersect_polygon_shapely is deprecated, use _intersect_polygon instead.", DeprecationWarning, ) return self._intersect_polygon(*args, **kwargs) def _intersect_polygon( self, shp, sort_by_cellid=True, contains_centroid=False, min_area_fraction=None, ): """Intersect a Polygon or MultiPolygon with the grid. Parameters ---------- shp : shapely Polygon or MultiPolygon Geometry to intersect. sort_by_cellid : bool, optional If True (default), sort candidate cells by cellid before intersecting. contains_centroid : bool, optional If True, only keep results where the cell centroid is contained in (or touches) the intersection. Default is False. min_area_fraction : float, optional Minimum fractional area threshold relative to the cell area. Cells with an intersection area smaller than ``min_area_fraction * cell_area`` are discarded. Default is None (no threshold). Returns ------- numpy.recarray Intersection results. Result contains the following columns: - cellid: cellid of intersected grid cell - cellids: legacy column containing (row, col) tuples for structured grids, or cellids for vertex grids (deprecated, use "cellid" column instead) - ixshapes or geometry: shapely geometry of the intersection result And optionally: - layer: layer index of for points with z-dimension when handle_z is True - row: row index of intersected grid cell, for structured grids - col: column index of intersected grid cell, for structured grids - areas: areas of intersection results for polygons """ if self.rtree: qcellids = self.strtree.query(shp, predicate="intersects") else: qcellids = self.filter_query_result(shp, self.cellids) if sort_by_cellid: qcellids = np.sort(qcellids) ixresult = shapely.intersection(shp, self.geoms[qcellids]) # discard empty intersection results mask_empty = shapely.is_empty(ixresult) # keep only Polygons and MultiPolygons geomtype_ids = shapely.get_type_id(ixresult) mask_type = np.isin(geomtype_ids, [3, 6, 7]) ixresult = ixresult[~mask_empty & mask_type] qcellids = qcellids[~mask_empty & mask_type] # parse geometry collections (i.e. when part of polygon lies on cell edge, # resulting in a linestring intersection result) if 7 in geomtype_ids: def parse_polygons_in_geom_collection(gc): parts = shapely.get_parts(gc) parts = parts[np.isin(shapely.get_type_id(parts), [3, 6])] if len(parts) > 1: p = shapely.multipolygons(parts) elif len(parts) == 0: p = shapely.Polygon() else: p = parts[0] return p mask_gc = geomtype_ids[~mask_empty & mask_type] == 7 ixresult[mask_gc] = np.apply_along_axis( parse_polygons_in_geom_collection, axis=0, arr=ixresult[mask_gc] ) # check centroids if contains_centroid: centroids = shapely.centroid(self.geoms[qcellids]) mask_centroid = shapely.contains(ixresult, centroids) | shapely.touches( ixresult, centroids ) ixresult = ixresult[mask_centroid] qcellids = qcellids[mask_centroid] # check intersection area if min_area_fraction: ix_areas = shapely.area(ixresult) cell_areas = shapely.area(self.geoms[qcellids]) mask_area_frac = (ix_areas / cell_areas) >= min_area_fraction ixresult = ixresult[mask_area_frac] qcellids = qcellids[mask_area_frac] # fill rec array if self.mfgrid.grid_type == "structured": names = ["cellids", "cellid", "row", "col", "ixshapes", "areas"] formats = ["O", int, int, int, "O", float] else: names = ["cellids", "cellid", "ixshapes", "areas"] formats = ["O", int, "O", float] rec = np.recarray(len(ixresult), names=names, formats=formats) rec.cellid = self.cellids[qcellids] rec.ixshapes = ixresult rec.areas = shapely.area(ixresult) # if structured calculate (i, j) cell address if self.mfgrid.grid_type == "structured": rec.cellids, rec.row, rec.col = self._cellid_to_rowcol( self.cellids[qcellids] ) else: rec.cellids = rec.cellid # NOTE: legacy support for cellids column return rec
[docs] def intersects( self, shp, shapetype=None, dataframe=None, ): """Return candidate grid cellids that intersect with shape(s). Parameters ---------- shp : shapely.geometry, geojson geometry, shapefile.shape, or flopy geometry object shape to intersect with the grid shapetype : str, optional type of shape (i.e. "point", "linestring", "polygon" or their multi-variants), used by GeoSpatialUtil if shp is passed as a list of vertices, default is None dataframe : bool, optional If True, return a ``pandas.DataFrame``; otherwise return a numpy recarray. Default will be set to True in the future; currently, None triggers a deprecation warning and returns a recarray (legacy behavior). Returns ------- numpy.recarray or pandas.DataFrame Grid cell candidates for intersections. Result contains the following columns: - cellid: cellid of candidate grid cell - cellids: legacy column containing (row, col) tuples for structured grids, or cellids for vertex grids (deprecated, use "cellid" column instead) And optionally: - row: row index of intersected grid cell, for structured grids - col: column index of intersected grid cell, for structured grids """ shp, shapetype = self._parse_input_shape(shp, shapetype=shapetype) # query grid or strtree qcellids = self.query_grid(shp, predicate="intersects") if not self.rtree: if isinstance(shp, np.ndarray) and len(shp) > 1: raise ValueError( "intersects() only accepts arrays containing one " f"{shapetype.name.lower()} at a time when rtree=False." ) qfiltered = self.filter_query_result(shp, qcellids) else: qfiltered = qcellids # sort cellids if qfiltered.ndim == 1: qfiltered = np.sort(qfiltered) else: qfiltered = qfiltered[:, np.lexsort((qfiltered[1], qfiltered[0]))] # determine size of output array nr = len(qfiltered) if qfiltered.ndim == 1 else qfiltered.shape[1] # build rec-array # use float dtype to allow nans in row/col/cellid if self.mfgrid.grid_type == "structured": names = ["shp_id", "cellids", "cellid", "row", "col"] formats = [int, "O", int, int, int] else: names = ["shp_id", "cellids", "cellid"] formats = [int, int, int] rec = np.recarray(nr, names=names, formats=formats) # shp was passed as single geometry if qfiltered.ndim == 1: rec.shp_id[:] = 0 rec.cellid = qfiltered # shape passed as array of geometries else: rec.shp_id = qfiltered[0] rec.cellid = qfiltered[1] if self.mfgrid.grid_type == "structured": rec.cellids, rec.row, rec.col = self._cellid_to_rowcol(rec.cellid) else: rec.cellids = rec.cellid # NOTE: legacy support for cellids column if dataframe is None: warnings.warn( "In the future this function will return a dataframe by default. " "Set dataframe=True to adopt future behavior and silence this warning. " "Set dataframe=False to silence this warning and maintain old behavior", DeprecationWarning, ) if dataframe: return DataFrame(rec).set_index("shp_id") return rec
def _cellid_to_rowcol(self, cellids): """Convert cellid (node number) to row, col. Parameters ---------- cellids : array_like array of cellids to convert Returns ------- tuple of array_like array of (row, col) tuples, array of rows, array of columns """ idx = np.nonzero(cellids >= 0) row = np.full_like(cellids, -1, dtype=int) # -1 is invalid col = np.full_like(cellids, -1, dtype=int) # -1 is invalid row[idx], col[idx] = self.mfgrid.get_lrc([cellids[idx]])[0][1:] # NOTE: build tuple for backward compatibility rctuple = np.full_like(cellids, -1, dtype=object) rctuple[idx] = list(zip(row[idx], col[idx])) return rctuple, row, col
[docs] def points_to_cellids( self, pts, handle_z=False, dataframe=True, ): """Return cellids for points intersecting the grid. Parameters ---------- pts : shapely geometry, geojson geometry, shapefile.Shape, np.ndarray, or FloPy geometry object Point(s) to intersect with the grid. array inputs must contain shapely point (or multipoint) geometries. handle_z : bool, optional Handle z-dimension for points. If True, returns a "layer" column with the computed layer index for each point (NA is returned where the points lie above/below the model grid). Default is False. dataframe : bool, optional If True, return a ``pandas.DataFrame``; otherwise return a numpy recarray. Default is True. Returns ------- pandas.DataFrame or numpy.recarray Grid cell indices per point. Result contains the following columns: - cellid: cellid of grid cell containing point - cellids: legacy column containing (row, col) tuples for structured grids, or cellids for vertex grids (deprecated, use "cellid" column instead) And optionally: - row: row index of intersected grid cell, for structured grids - col: column index of intersected grid cell, for structured grids - layer: layer index of for points with z-dimension when handle_z is True Notes ----- Requires ``rtree=True`` when initializing ``GridIntersect``. """ if not self.rtree: raise ValueError( "points_to_cellids() requires rtree=True when" " initializing GridIntersect" ) if not isinstance(pts, np.ndarray): if shapely.get_type_id(pts) == shapely.GeometryType.MULTIPOINT: pts = np.array(shapely.get_parts(pts)) else: pts = np.array([pts]) # query grid or strtree qfiltered = self.query_grid(pts, predicate="intersects") # sort cellids if qfiltered.ndim == 1: qfiltered = np.sort(qfiltered) else: qfiltered = qfiltered[:, np.lexsort((qfiltered[1], qfiltered[0]))] # determine size of output array if isinstance(pts, np.ndarray): # one result per point nr = len(pts) else: # single point nr = 1 if len(qfiltered) > 0 else 0 # 1 if intersects, else 0 # build rec-array if self.mfgrid.grid_type == "structured": names = ["shp_id", "cellids", "cellid", "row", "col"] formats = [int, "O", int, int, int] # float to allow nans in row/col else: names = ["shp_id", "cellids", "cellid"] formats = [int, int, int] # float to allow nans rec = np.recarray(nr, names=names, formats=formats) rec.cellid = -1 # invalid value by default # return only 1 gr id cell intersection result for each point uniques, idx = np.unique(qfiltered[0], return_index=True) rec.shp_id = np.arange(nr) rec.cellid[uniques] = qfiltered[1, idx] if self.mfgrid.grid_type == "structured": rec.cellids, rec.row, rec.col = self._cellid_to_rowcol(rec.cellid) else: rec.cellids = rec.cellid # NOTE: legacy support for cellids column if handle_z and shapely.has_z(pts).any() and len(rec.cellid) > 0: laypos = self.get_layer_from_z(pts, rec.cellid) # copy data to new array to include layer position rec = nprecfns.append_fields( rec, names="layer", data=laypos, dtypes=int, usemask=False, asrecarray=True, ) if dataframe: # replace invalid indices with NA, replace invalid layer with NA return ( DataFrame(rec).replace(-1, NA).replace(999_999, NA).set_index("shp_id") ) return np.ma.masked_where(rec.cellid < 0, rec)
[docs] @staticmethod def plot_polygon(polys, ax=None, **kwargs): """Plot polygons. Parameters ---------- polys : collection of polygons list, array or GeoSeries containing polygons ax : matplotlib.pyplot.axes, optional axes to plot onto, if not provided, creates a new figure **kwargs: passed to the plot function Returns ------- matplotlib.pyplot.axes returns the axes handle """ if ax is None: _, ax = plt.subplots() ax.set_aspect("equal", adjustable="box") autoscale = True else: autoscale = False patches = [] if "facecolor" in kwargs: use_facecolor = True fc = kwargs.pop("facecolor") else: use_facecolor = None def add_poly_patch(poly): if not use_facecolor: fc = f"C{i % 10}" ppi = _polygon_patch(poly, facecolor=fc, **kwargs) patches.append(ppi) if isinstance(polys, (shapely.Polygon, shapely.MultiPolygon)): polys = [polys] for i, ishp in enumerate(polys): if hasattr(ishp, "geoms"): for geom in ishp.geoms: add_poly_patch(geom) else: add_poly_patch(ishp) pc = PatchCollection(patches, match_original=True) ax.add_collection(pc) if autoscale: ax.autoscale_view() return ax
[docs] @staticmethod def plot_linestring(ls, ax=None, cmap=None, **kwargs): """Plot linestrings. Parameters ---------- ls : collection of linestrings list, array or GeoSeries containing linestrings ax : matplotlib.pyplot.axes, optional axes to plot onto, if not provided, creates a new figure cmap : str matplotlib colormap **kwargs: passed to the plot function Returns ------- matplotlib.pyplot.axes returns the axes handle """ shapely_plot = import_optional_dependency("shapely.plotting") if ax is None: _, ax = plt.subplots() ax.set_aspect("equal", adjustable="box") specified_color = True if "c" in kwargs: c = kwargs.pop("c") elif "color" in kwargs: c = kwargs.pop("color") else: specified_color = False if isinstance(ls, (shapely.LineString, shapely.MultiLineString)): ls = [ls] if cmap is not None: colormap = plt.get_cmap(cmap) colors = colormap(np.linspace(0, 1, len(ls))) for i, ishp in enumerate(ls): if not specified_color: if cmap is None: c = f"C{i % 10}" else: c = colors[i] shapely_plot.plot_line(ishp, ax=ax, color=c, **kwargs) return ax
[docs] @staticmethod def plot_point(pts, ax=None, **kwargs): """Plot points. Parameters ---------- pts : collection of points array or GeoSeries containing point geometries ax : matplotlib.pyplot.axes, optional axes to plot onto, if not provided, creates a new figure **kwargs: passed to the scatter function Returns ------- matplotlib.pyplot.axes returns the axes handle """ shapely = import_optional_dependency("shapely") shapely_plot = import_optional_dependency("shapely.plotting") if ax is None: _, ax = plt.subplots() # allow for result to be geodataframe if isinstance(pts, (shapely.Point, shapely.MultiPoint)): pts = [pts] maskpts = np.isin( shapely.get_type_id(pts), [shapely.GeometryType.POINT, shapely.GeometryType.MULTIPOINT], ) shapely_plot.plot_points(pts[maskpts], ax=ax, **kwargs) return ax
[docs] def plot_intersection_result(self, result, plot_grid=True, ax=None, **kwargs): """Plot intersection result. Parameters ---------- result : numpy.rec.recarray or geopandas.GeoDataFrame result of intersect() plot_grid : bool, optional plot model grid, by default True ax : matplotlib.Axes, optional axes to plot on, by default None which creates a new axis Returns ------- ax : matplotlib.Axes returns axes handle """ shapely = import_optional_dependency("shapely") if plot_grid: self.mfgrid.plot(ax=ax) geoms = ( result["ixshapes"] if isinstance(result, np.rec.recarray) else result["geometry"] ) if np.isin( shapely.get_type_id(geoms), [shapely.GeometryType.POINT, shapely.GeometryType.MULTIPOINT], ).all(): ax = GridIntersect.plot_point(geoms, ax=ax, **kwargs) elif np.isin( shapely.get_type_id(geoms), [ shapely.GeometryType.LINESTRING, shapely.GeometryType.MULTILINESTRING, shapely.GeometryType.LINEARRING, ], ).all(): ax = GridIntersect.plot_linestring(geoms, ax=ax, **kwargs) elif np.isin( shapely.get_type_id(geoms), [shapely.GeometryType.POLYGON, shapely.GeometryType.MULTIPOLYGON], ).all(): ax = GridIntersect.plot_polygon(geoms, ax=ax, **kwargs) return ax
[docs] def get_layer_from_z(self, pts, cellids): """Compute layer indices from point z-values. Parameters ---------- pts : shapely geometry Points geometry (single or array-like) with z-values. cellids : array_like Array of candidate cellids for the points. Returns ------- numpy.ndarray layer index for each point. Points below/above the grid are returned with index -1. """ z_arr = np.atleast_1d(shapely.get_z(pts)) mask_valid = cellids >= 0 if self.mfgrid.grid_type == "structured": row, col = self.mfgrid.get_lrc([cellids[mask_valid].astype(int)])[0][1:] surface_elevations = self.mfgrid.top_botm[:, row, col] elif self.mfgrid.grid_type == "vertex": surface_elevations = self.mfgrid.top_botm[:, cellids[mask_valid]] else: raise NotImplementedError( "get_layer_from_z() is only implemented for " "structured and vertex grids." ) zb = surface_elevations < z_arr[mask_valid] mask_above = zb.all(axis=0) mask_below = (~zb).all(axis=0) laypos = (np.nanargmax(zb, axis=0) - 1).astype(int) laypos[mask_above] = -1 laypos[mask_below] = -1 laypos_full = np.full_like(z_arr, -1, dtype=int) laypos_full[mask_valid] = laypos return laypos_full
def _polygon_patch(polygon, **kwargs): patch = PathPatch( Path.make_compound_path( Path(np.asarray(polygon.exterior.coords)[:, :2]), *[Path(np.asarray(ring.coords)[:, :2]) for ring in polygon.interiors], ), **kwargs, ) return patch