import warnings
import numpy as np
import pandas as pd
from .utl_import import import_optional_dependency
[docs]def area_of_polygon(x, y):
shapely = import_optional_dependency("shapely")
from shapely.geometry import Polygon
pgon = Polygon(zip(x, y))
return pgon.area
[docs]def centroid_of_polygon(points):
shapely = import_optional_dependency("shapely")
from shapely.geometry import Polygon
pgon = Polygon(points)
return pgon.centroid.x, pgon.centroid.y
[docs]class Point:
def __init__(self, x, y):
self.x = x
self.y = y
return
[docs] def offset(self, x0, y0):
self.x -= x0
self.y -= y0
[docs] def normalize(self, dx, dy):
if dx > 0.0:
self.x /= dx
if dy > 0.0:
self.y /= dy
[docs]def normalize_points(a, b, c):
x0, y0 = min(a.x, b.x), min(a.y, b.y)
xl, yl = abs(a.x - b.x), abs(a.y - b.y)
a.offset(x0, y0)
b.offset(x0, y0)
c.offset(x0, y0)
a.normalize(xl, yl)
b.normalize(xl, yl)
c.normalize(xl, yl)
return a, b, c
[docs]def isBetween(a, b, c, epsilon=0.001):
a, b, c = normalize_points(a, b, c)
crossproduct = (c.y - a.y) * (b.x - a.x) - (c.x - a.x) * (b.y - a.y)
if abs(crossproduct) > epsilon:
return False # (or != 0 if using integers)
dotproduct = (c.x - a.x) * (b.x - a.x) + (c.y - a.y) * (b.y - a.y)
if dotproduct < 0:
return False
squaredlengthba = (b.x - a.x) * (b.x - a.x) + (b.y - a.y) * (b.y - a.y)
if dotproduct > squaredlengthba:
return False
return True
[docs]def shared_face(ivlist1, ivlist2):
for i in range(len(ivlist1) - 1):
iv1 = ivlist1[i]
iv2 = ivlist1[i + 1]
for i2 in range(len(ivlist2) - 1):
if ivlist2[i2 : i2 + 1] == [iv2, iv1]:
return True
return False
[docs]def segment_face(ivert, ivlist1, ivlist2, vertices):
"""
Check the vertex lists for cell 1 and cell 2. Add a new vertex to cell 1
if necessary.
Parameters
----------
ivert : int
vertex number to check
ivlist1 : list
list of vertices for cell 1. Add a new vertex to this cell if needed.
ivlist2 : list
list of vertices for cell2.
vertices : ndarray
array of x, y vertices
Returns
-------
segmented : bool
Return True if a face in cell 1 was split up by adding a new vertex
"""
# go through ivlist1 and find faces that have ivert
faces_to_check = []
for ipos in range(len(ivlist1) - 1):
face = (ivlist1[ipos], ivlist1[ipos + 1])
if ivert in face:
faces_to_check.append(face)
# go through ivlist2 and find points to check
points_to_check = []
for ipos in range(len(ivlist2) - 1):
if ivlist2[ipos] == ivert:
points_to_check.append(ivlist2[ipos + 1])
elif ivlist2[ipos + 1] == ivert:
points_to_check.append(ivlist2[ipos])
for face in faces_to_check:
iva, ivb = face
xa, ya = vertices[iva]
xb, yb = vertices[ivb]
for ivc in points_to_check:
if ivc not in face:
a = Point(xa, ya)
b = Point(xb, yb)
x, y = vertices[ivc]
c = Point(x, y)
if isBetween(a, b, c):
ipos = ivlist1.index(ivb)
if ipos == 0:
ipos = len(ivlist1) - 1
ivlist1.insert(ipos, ivc)
return True
return False
[docs]def to_cvfd(
vertdict,
nodestart=None,
nodestop=None,
skip_hanging_node_check=False,
duplicate_decimals=9,
verbose=False,
detect_non_convergence=True,
):
"""
Convert a vertex dictionary into verts and iverts
Parameters
----------
vertdict
A dictionary {icell: [(x1, y1), (x2, y2), (x3, y3), ...]}
nodestart : int
Starting node number. (default is zero)
nodestop : int
Ending node number up to but not including. (default is len(vertdict))
skip_hanging_node_check : bool
Skip the hanging node check. The hanging node check is designed for
quad-based grid refinement (e.g., quadtree grids) where larger cells
are subdivided, creating "hanging nodes" that need to be added to
neighboring cells. The check may not converge for grids with floating
point precision artifacts like nearly duplicate vertices. Set this
option True to avoid this. If False and non-convergence is detected
(assuming detect_non_convergence is True), a warning will be issued.
duplicate_decimals : int
Decimals to round duplicate vertex checks. GRIDGEN can occasionally
produce very-nearly overlapping vertices, this can be used to change
the sensitivity for filtering out duplicates. (default is 9)
verbose : bool
Print messages to the screen. (default is False)
detect_non_convergence : bool
Enable automatic detection of non-convergent hanging node checks.
When enabled, the algorithm monitors segmentation counts across
iterations. If counts remain high and are flat or increasing, stop
early and issue a warning. This prevents infinite loops while still
allowing slow-but-steady decrease to converge for complex grids.
Set False to disable this check and allow unlimited iterations.
(default is True)
Returns
-------
verts : ndarray
array of x, y vertices
iverts : list
list containing a list for each cell
"""
if nodestart is None:
nodestart = 0
if nodestop is None:
nodestop = len(vertdict)
ncells = nodestop - nodestart
# First create vertexdict {(x1, y1): ivert1, (x2, y2): ivert2, ...} and
# vertexlist [[ivert1, ivert2, ...], [ivert9, ivert10, ...], ...]
# In the process, filter out any duplicate vertices
vertexdict = {}
vertexlist = []
xcyc = np.empty((ncells, 2), dtype=float)
iv = 0
nvertstart = 0
if verbose:
print("Converting vertdict to cvfd representation.")
print(f"Number of cells in vertdict is: {len(vertdict)}")
print(
f"Cell {nodestart} up to {nodestop} (but not including) will be processed."
)
for icell in range(nodestart, nodestop):
points = vertdict[icell]
nvertstart += len(points)
xc, yc = centroid_of_polygon(points)
xcyc[icell, 0] = xc
xcyc[icell, 1] = yc
ivertlist = []
for p in points:
pt = (
round(p[0], duplicate_decimals),
round(p[1], duplicate_decimals),
)
if pt in vertexdict:
ivert = vertexdict[pt]
else:
vertexdict[pt] = iv
ivert = iv
iv += 1
ivertlist.append(ivert)
if ivertlist[0] != ivertlist[-1]:
raise Exception(f"Cell {icell} not closed")
vertexlist.append(ivertlist)
# next create vertex_cell_dict = {}; for each vertex, store list of cells
# that use it
nvert = len(vertexdict)
if verbose:
print(f"Started with {nvertstart} vertices.")
print(f"Ended up with {nvert} vertices.")
print(f"Reduced total number of vertices by {nvertstart - nvert}")
print("Creating dict of vertices with their associated cells")
vertex_cell_dict = {}
for icell in range(nodestart, nodestop):
ivertlist = vertexlist[icell]
for ivert in ivertlist:
if ivert in vertex_cell_dict:
if icell not in vertex_cell_dict[ivert]:
vertex_cell_dict[ivert].append(icell)
else:
vertex_cell_dict[ivert] = [icell]
if verbose:
print("Done creating dict of vertices with their associated cells")
# Now, go through each vertex and look at the cells that use the vertex.
# For quadtree-like grids, there may be a need to add a new hanging node
# vertex to the larger cell.
vertexdict_keys = list(vertexdict.keys())
if not skip_hanging_node_check:
if verbose:
print("Checking for hanging nodes.")
finished = False
iteration = 0
segmentation_history = []
non_convergence_threshold = 3
while not finished:
finished = True
iteration += 1
segmentations_this_iter = 0
for ivert, cell_list in vertex_cell_dict.items():
for icell1 in cell_list:
for icell2 in cell_list:
# skip if same cell
if icell1 == icell2:
continue
# skip if share face already
ivertlist1 = vertexlist[icell1]
ivertlist2 = vertexlist[icell2]
if shared_face(ivertlist1, ivertlist2):
continue
# don't share a face, so need to segment if necessary
segmented = segment_face(
ivert,
ivertlist1,
ivertlist2,
vertexdict_keys,
)
if segmented:
finished = False
segmentations_this_iter += 1
segmentation_history.append(segmentations_this_iter)
if (
detect_non_convergence
and len(segmentation_history) >= non_convergence_threshold
):
recent_counts = segmentation_history[-non_convergence_threshold:]
first_count = recent_counts[0]
last_count = recent_counts[-1]
# Check if the count is flat or increasing
# but only if we're doing substantial work
min_segmentations_threshold = 50
if (
first_count >= min_segmentations_threshold
and last_count >= first_count
):
warnings.warn(
f"Hanging node check stopped after {iteration} iterations "
f"due to non-convergence (segmentation counts are flat or "
f"increasing at {segmentations_this_iter} per iteration). "
f"This typically occurs with complex Voronoi or other "
f"unstructured grids. Set skip_hanging_node_check=True "
f"for these grid types.",
UserWarning,
)
break
if verbose:
if finished:
print(f"Done checking for hanging nodes after {iteration} iterations.")
else:
print(f"Aborted hanging node check after {iteration} iterations.")
verts = np.array(vertexdict_keys)
iverts = vertexlist
return verts, iverts
[docs]def shapefile_to_cvfd(shp, **kwargs):
import shapefile
print(f"Translating shapefile ({shp}) into cvfd format")
sf = shapefile.Reader(shp)
shapes = sf.shapes()
vertdict = {}
for icell, shape in enumerate(shapes):
points = shape.points
vertdict[icell] = points
verts, iverts = to_cvfd(vertdict, **kwargs)
return verts, iverts
[docs]def shapefile_to_xcyc(shp):
"""
Get cell centroid coordinates
Parameters
----------
shp : string
Name of shape file
Returns
-------
xcyc : ndarray
x, y coordinates of all polygons in shp
"""
import shapefile
print(f"Translating shapefile ({shp}) into cell centroids")
sf = shapefile.Reader(shp)
shapes = sf.shapes()
ncells = len(shapes)
xcyc = np.empty((ncells, 2), dtype=float)
for icell, shape in enumerate(shapes):
points = shape.points
xc, yc = centroid_of_polygon(points)
xcyc[icell, 0] = xc
xcyc[icell, 1] = yc
return xcyc
[docs]def gridlist_to_verts(gridlist):
"""
Take a list of flopy structured model grids and convert them into vertices.
The idomain can be set to remove cells in a parent grid. Cells from a
child grid will patched in to make a single set of vertices. Cells will
be numbered according to consecutive numbering of active cells in the
grid list.
Parameters
----------
gridlist : list
List of flopy.discretization.modelgrid. Must be of type structured
grids
Returns
-------
verts, iverts : np.ndarray, list
vertices and list of cells and which vertices comprise the cells
"""
vertdict = {}
icell = 0
for sg in gridlist:
ilays, irows, icols = np.asarray(sg.idomain > 0).nonzero()
for _, i, j in zip(ilays, irows, icols):
v = sg.get_cell_vertices(i, j)
vertdict[icell] = v + [v[0]]
icell += 1
verts, iverts = to_cvfd(vertdict, verbose=False)
return verts, iverts
[docs]def get_disv_gridprops(verts, iverts, xcyc=None):
"""
Calculate disv grid properties from verts and iverts
Parameters
----------
verts : ndarray
2d array of x, y vertices
iverts : list
list of size ncpl, with a list of vertex numbers for each cell
Returns
-------
gridprops : dict
Dictionary containing entries that can be passed directly into the
modflow6 disv package.
"""
nvert = verts.shape[0]
ncpl = len(iverts)
if xcyc is None:
xcyc = np.empty((ncpl, 2), dtype=float)
for icell in range(ncpl):
vlist = [(verts[ivert, 0], verts[ivert, 1]) for ivert in iverts[icell]]
xcyc[icell, 0], xcyc[icell, 1] = centroid_of_polygon(vlist)
else:
assert xcyc.shape == (ncpl, 2)
vertices = []
for i in range(nvert):
vertices.append((i, verts[i, 0], verts[i, 1]))
cell2d = []
for i in range(ncpl):
cell2d.append([i, xcyc[i, 0], xcyc[i, 1], len(iverts[i])] + list(iverts[i]))
gridprops = {}
gridprops["ncpl"] = ncpl
gridprops["nvert"] = nvert
gridprops["vertices"] = vertices
gridprops["cell2d"] = cell2d
return gridprops