Source code for flopy.utils.cvfdutil

import warnings

import numpy as np
import pandas as pd

from .utl_import import import_optional_dependency


[docs]def area_of_polygon(x, y): shapely = import_optional_dependency("shapely") from shapely.geometry import Polygon pgon = Polygon(zip(x, y)) return pgon.area
[docs]def centroid_of_polygon(points): shapely = import_optional_dependency("shapely") from shapely.geometry import Polygon pgon = Polygon(points) return pgon.centroid.x, pgon.centroid.y
[docs]class Point: def __init__(self, x, y): self.x = x self.y = y return
[docs] def offset(self, x0, y0): self.x -= x0 self.y -= y0
[docs] def normalize(self, dx, dy): if dx > 0.0: self.x /= dx if dy > 0.0: self.y /= dy
[docs]def normalize_points(a, b, c): x0, y0 = min(a.x, b.x), min(a.y, b.y) xl, yl = abs(a.x - b.x), abs(a.y - b.y) a.offset(x0, y0) b.offset(x0, y0) c.offset(x0, y0) a.normalize(xl, yl) b.normalize(xl, yl) c.normalize(xl, yl) return a, b, c
[docs]def isBetween(a, b, c, epsilon=0.001): a, b, c = normalize_points(a, b, c) crossproduct = (c.y - a.y) * (b.x - a.x) - (c.x - a.x) * (b.y - a.y) if abs(crossproduct) > epsilon: return False # (or != 0 if using integers) dotproduct = (c.x - a.x) * (b.x - a.x) + (c.y - a.y) * (b.y - a.y) if dotproduct < 0: return False squaredlengthba = (b.x - a.x) * (b.x - a.x) + (b.y - a.y) * (b.y - a.y) if dotproduct > squaredlengthba: return False return True
[docs]def shared_face(ivlist1, ivlist2): for i in range(len(ivlist1) - 1): iv1 = ivlist1[i] iv2 = ivlist1[i + 1] for i2 in range(len(ivlist2) - 1): if ivlist2[i2 : i2 + 1] == [iv2, iv1]: return True return False
[docs]def segment_face(ivert, ivlist1, ivlist2, vertices): """ Check the vertex lists for cell 1 and cell 2. Add a new vertex to cell 1 if necessary. Parameters ---------- ivert : int vertex number to check ivlist1 : list list of vertices for cell 1. Add a new vertex to this cell if needed. ivlist2 : list list of vertices for cell2. vertices : ndarray array of x, y vertices Returns ------- segmented : bool Return True if a face in cell 1 was split up by adding a new vertex """ # go through ivlist1 and find faces that have ivert faces_to_check = [] for ipos in range(len(ivlist1) - 1): face = (ivlist1[ipos], ivlist1[ipos + 1]) if ivert in face: faces_to_check.append(face) # go through ivlist2 and find points to check points_to_check = [] for ipos in range(len(ivlist2) - 1): if ivlist2[ipos] == ivert: points_to_check.append(ivlist2[ipos + 1]) elif ivlist2[ipos + 1] == ivert: points_to_check.append(ivlist2[ipos]) for face in faces_to_check: iva, ivb = face xa, ya = vertices[iva] xb, yb = vertices[ivb] for ivc in points_to_check: if ivc not in face: a = Point(xa, ya) b = Point(xb, yb) x, y = vertices[ivc] c = Point(x, y) if isBetween(a, b, c): ipos = ivlist1.index(ivb) if ipos == 0: ipos = len(ivlist1) - 1 ivlist1.insert(ipos, ivc) return True return False
[docs]def to_cvfd( vertdict, nodestart=None, nodestop=None, skip_hanging_node_check=False, duplicate_decimals=9, verbose=False, detect_non_convergence=True, ): """ Convert a vertex dictionary into verts and iverts Parameters ---------- vertdict A dictionary {icell: [(x1, y1), (x2, y2), (x3, y3), ...]} nodestart : int Starting node number. (default is zero) nodestop : int Ending node number up to but not including. (default is len(vertdict)) skip_hanging_node_check : bool Skip the hanging node check. The hanging node check is designed for quad-based grid refinement (e.g., quadtree grids) where larger cells are subdivided, creating "hanging nodes" that need to be added to neighboring cells. The check may not converge for grids with floating point precision artifacts like nearly duplicate vertices. Set this option True to avoid this. If False and non-convergence is detected (assuming detect_non_convergence is True), a warning will be issued. duplicate_decimals : int Decimals to round duplicate vertex checks. GRIDGEN can occasionally produce very-nearly overlapping vertices, this can be used to change the sensitivity for filtering out duplicates. (default is 9) verbose : bool Print messages to the screen. (default is False) detect_non_convergence : bool Enable automatic detection of non-convergent hanging node checks. When enabled, the algorithm monitors segmentation counts across iterations. If counts remain high and are flat or increasing, stop early and issue a warning. This prevents infinite loops while still allowing slow-but-steady decrease to converge for complex grids. Set False to disable this check and allow unlimited iterations. (default is True) Returns ------- verts : ndarray array of x, y vertices iverts : list list containing a list for each cell """ if nodestart is None: nodestart = 0 if nodestop is None: nodestop = len(vertdict) ncells = nodestop - nodestart # First create vertexdict {(x1, y1): ivert1, (x2, y2): ivert2, ...} and # vertexlist [[ivert1, ivert2, ...], [ivert9, ivert10, ...], ...] # In the process, filter out any duplicate vertices vertexdict = {} vertexlist = [] xcyc = np.empty((ncells, 2), dtype=float) iv = 0 nvertstart = 0 if verbose: print("Converting vertdict to cvfd representation.") print(f"Number of cells in vertdict is: {len(vertdict)}") print( f"Cell {nodestart} up to {nodestop} (but not including) will be processed." ) for icell in range(nodestart, nodestop): points = vertdict[icell] nvertstart += len(points) xc, yc = centroid_of_polygon(points) xcyc[icell, 0] = xc xcyc[icell, 1] = yc ivertlist = [] for p in points: pt = ( round(p[0], duplicate_decimals), round(p[1], duplicate_decimals), ) if pt in vertexdict: ivert = vertexdict[pt] else: vertexdict[pt] = iv ivert = iv iv += 1 ivertlist.append(ivert) if ivertlist[0] != ivertlist[-1]: raise Exception(f"Cell {icell} not closed") vertexlist.append(ivertlist) # next create vertex_cell_dict = {}; for each vertex, store list of cells # that use it nvert = len(vertexdict) if verbose: print(f"Started with {nvertstart} vertices.") print(f"Ended up with {nvert} vertices.") print(f"Reduced total number of vertices by {nvertstart - nvert}") print("Creating dict of vertices with their associated cells") vertex_cell_dict = {} for icell in range(nodestart, nodestop): ivertlist = vertexlist[icell] for ivert in ivertlist: if ivert in vertex_cell_dict: if icell not in vertex_cell_dict[ivert]: vertex_cell_dict[ivert].append(icell) else: vertex_cell_dict[ivert] = [icell] if verbose: print("Done creating dict of vertices with their associated cells") # Now, go through each vertex and look at the cells that use the vertex. # For quadtree-like grids, there may be a need to add a new hanging node # vertex to the larger cell. vertexdict_keys = list(vertexdict.keys()) if not skip_hanging_node_check: if verbose: print("Checking for hanging nodes.") finished = False iteration = 0 segmentation_history = [] non_convergence_threshold = 3 while not finished: finished = True iteration += 1 segmentations_this_iter = 0 for ivert, cell_list in vertex_cell_dict.items(): for icell1 in cell_list: for icell2 in cell_list: # skip if same cell if icell1 == icell2: continue # skip if share face already ivertlist1 = vertexlist[icell1] ivertlist2 = vertexlist[icell2] if shared_face(ivertlist1, ivertlist2): continue # don't share a face, so need to segment if necessary segmented = segment_face( ivert, ivertlist1, ivertlist2, vertexdict_keys, ) if segmented: finished = False segmentations_this_iter += 1 segmentation_history.append(segmentations_this_iter) if ( detect_non_convergence and len(segmentation_history) >= non_convergence_threshold ): recent_counts = segmentation_history[-non_convergence_threshold:] first_count = recent_counts[0] last_count = recent_counts[-1] # Check if the count is flat or increasing # but only if we're doing substantial work min_segmentations_threshold = 50 if ( first_count >= min_segmentations_threshold and last_count >= first_count ): warnings.warn( f"Hanging node check stopped after {iteration} iterations " f"due to non-convergence (segmentation counts are flat or " f"increasing at {segmentations_this_iter} per iteration). " f"This typically occurs with complex Voronoi or other " f"unstructured grids. Set skip_hanging_node_check=True " f"for these grid types.", UserWarning, ) break if verbose: if finished: print(f"Done checking for hanging nodes after {iteration} iterations.") else: print(f"Aborted hanging node check after {iteration} iterations.") verts = np.array(vertexdict_keys) iverts = vertexlist return verts, iverts
[docs]def shapefile_to_cvfd(shp, **kwargs): import shapefile print(f"Translating shapefile ({shp}) into cvfd format") sf = shapefile.Reader(shp) shapes = sf.shapes() vertdict = {} for icell, shape in enumerate(shapes): points = shape.points vertdict[icell] = points verts, iverts = to_cvfd(vertdict, **kwargs) return verts, iverts
[docs]def shapefile_to_xcyc(shp): """ Get cell centroid coordinates Parameters ---------- shp : string Name of shape file Returns ------- xcyc : ndarray x, y coordinates of all polygons in shp """ import shapefile print(f"Translating shapefile ({shp}) into cell centroids") sf = shapefile.Reader(shp) shapes = sf.shapes() ncells = len(shapes) xcyc = np.empty((ncells, 2), dtype=float) for icell, shape in enumerate(shapes): points = shape.points xc, yc = centroid_of_polygon(points) xcyc[icell, 0] = xc xcyc[icell, 1] = yc return xcyc
[docs]def gridlist_to_verts(gridlist): """ Take a list of flopy structured model grids and convert them into vertices. The idomain can be set to remove cells in a parent grid. Cells from a child grid will patched in to make a single set of vertices. Cells will be numbered according to consecutive numbering of active cells in the grid list. Parameters ---------- gridlist : list List of flopy.discretization.modelgrid. Must be of type structured grids Returns ------- verts, iverts : np.ndarray, list vertices and list of cells and which vertices comprise the cells """ vertdict = {} icell = 0 for sg in gridlist: ilays, irows, icols = np.asarray(sg.idomain > 0).nonzero() for _, i, j in zip(ilays, irows, icols): v = sg.get_cell_vertices(i, j) vertdict[icell] = v + [v[0]] icell += 1 verts, iverts = to_cvfd(vertdict, verbose=False) return verts, iverts
[docs]def get_disv_gridprops(verts, iverts, xcyc=None): """ Calculate disv grid properties from verts and iverts Parameters ---------- verts : ndarray 2d array of x, y vertices iverts : list list of size ncpl, with a list of vertex numbers for each cell Returns ------- gridprops : dict Dictionary containing entries that can be passed directly into the modflow6 disv package. """ nvert = verts.shape[0] ncpl = len(iverts) if xcyc is None: xcyc = np.empty((ncpl, 2), dtype=float) for icell in range(ncpl): vlist = [(verts[ivert, 0], verts[ivert, 1]) for ivert in iverts[icell]] xcyc[icell, 0], xcyc[icell, 1] = centroid_of_polygon(vlist) else: assert xcyc.shape == (ncpl, 2) vertices = [] for i in range(nvert): vertices.append((i, verts[i, 0], verts[i, 1])) cell2d = [] for i in range(ncpl): cell2d.append([i, xcyc[i, 0], xcyc[i, 1], len(iverts[i])] + list(iverts[i])) gridprops = {} gridprops["ncpl"] = ncpl gridprops["nvert"] = nvert gridprops["vertices"] = vertices gridprops["cell2d"] = cell2d return gridprops