Source code for flopy.utils.hfb_util

import numpy as np
import pandas as pd

from .geometry import distance
from .geospatial_utils import GeoSpatialUtil
from .gridintersect import GridIntersect


def _min_distance_index(x0, x1, y0, y1):
    """
    Method to get the index of the minimum distance between points

    Parameters
    ----------
    x0 : np.ndarray
        array of x coordinates
    x1 : float
        x coordinate of a point
    y0 : np.ndarray
        array of y coordinates
    y1 : float
        y coordinate of a point

    Returns
    -------
        int : index location
    """
    dist = distance(x0, y0, x1, y1)
    idx = np.where(dist == np.min(dist))[0]
    return idx


def _edge_length_lut(xyverts):
    """
    Method to create a look-up table based on index of
    grid cell edge lengths

    Parameters
    ----------
    xyverts: np.ndarray
        numpy array of xy coordinate pairs

    Returns
    -------
        dict
    """
    xy0 = xyverts.T[:, :-1]
    xy1 = xyverts.T[:, 1:]
    dist = distance(xy0[0], xy0[1], xy1[0], xy1[1])
    dist_lu = {}
    for v, d in enumerate(dist):
        if v < len(dist) - 1:
            dist_lu[(v, v + 1)] = d
        else:
            dist_lu[(0, v)] = d
    return dist_lu


def _minimize_hfb_deviance(idxs, xyverts, pts):
    """
    Method to minimumize the distance of the HFB fault trace from
    the intersection points of the fault and model grid cell. Used
    primarily for tie-breakers where there is not a clear "routing"
    option based on vertex to vertex edge length distance

    Parameters
    ----------
    idxs : iterable
        list, tuple, or numpy array of polygon vertex indices for test hfb path
    xyverts : np.array
        numpy array of (x,y) polygon vertices
    pts : np.array
        numpy array of (x,y) points of intersection between line and polygon

    Returns
    -------
        float : sum of minimum distances between edge centroids across a potential
        fault routing option and the intersection points between the fault line
        and grid node
    """
    lcs = []
    xyverts = xyverts.T
    pts = pts.T
    for ix in range(1, len(idxs)):
        ixx = (idxs[ix - 1], idxs[ix])
        xc = np.mean(xyverts[0, ixx])
        yc = np.mean(xyverts[1, ixx])
        lcs.append([xc, yc])

    mins = []
    for xc, yc in lcs:
        dists = distance(pts[0], pts[1], xc, yc)
        mins.append(np.min(dists))

    return np.sum(mins)


def _perturb_intersection_coords(idxs, xyverts, ipt, epsilon=1e-06):
    """
    Method to perturb an intersection location by a small amount which is used
    to handle colinear intersection and intersection at the midpoint of a cell edge.

    Parameters
    ----------
    idxs : list
        list of vertex indices
    xyverts : np.array
        numpy array of x,y vertices for a cell
    ipt : iterable
        intersection point x,y coordinate
    epsilon : float
        perturbation value

    Returns
    -------
        ipt : list holding x, y coordinate pair of the perturbed intersection
    """
    xyverts = xyverts.T
    xverts = xyverts[0][idxs]
    yverts = xyverts[1][idxs]

    if (xverts[0] - xverts[1]) == 0:
        # vertical line
        new_vrt = [xverts[0], np.mean(yverts) - epsilon]
    elif (yverts[0] - yverts[1]) == 0:
        # horizontal line
        new_vrt = [np.mean(xverts) - epsilon, yverts[0]]
    else:
        # need to adjust across the line
        m = (yverts[1] - yverts[0]) / [xverts[1] - xverts[0]]
        if xverts[0] != 0:
            vidx = 0
        else:
            vidx = 1
        b = yverts[vidx] / (m * xverts[vidx])
        cx = ipt[0] - epsilon
        cy = m * cx + b
        new_vrt = [cx, cy]

    ipt = new_vrt
    return ipt


def _edge_neighbors(modelgrid):
    """
    Method to get a dictionary of unique node edges (by ivert) and the nodes the
    edge is shared between

    Parameters
    ----------
    modelgrid : flopy.discretization.Grid object

    Returns
    -------
        dict : dictionary of {edge iverts : [nodes]}
    """
    node_num = 0
    geoms = []
    node_nums = []

    for poly in modelgrid.iverts:
        poly = [int(i) for i in poly]
        if poly[0] == poly[-1]:
            poly = poly[:-1]
        for v in range(len(poly)):
            geoms.append(tuple(sorted([poly[v - 1], poly[v]])))
        node_nums += [node_num] * len(poly)
        node_num += 1

    edge_nodes = {}
    for i, item in enumerate(geoms):
        if item not in edge_nodes:
            edge_nodes[item] = {
                node_nums[i],
            }
        else:
            edge_nodes[item].add(node_nums[i])

    return edge_nodes


[docs]def make_hfb_array(modelgrid, geom): """ Method to make a HFB recarray from geospatial linestring information. Note: this method was developed to only accept a single linestring at a time to give the user an opportunity to save unique fault information for calibration purposes. Parameters ---------- modelgrid : flopy.discretization.Grid object FloPy StructuredGrid, VertexGrid, and UnstructuredGrid are supported geom : geospatial object geom parameter is a geospatial LineString representation. Shapely.geometry.LineString, GeoJson, List of vertices, shapefile.shape types are supported. Returns ------- np.recarray numpy recarray of cellid1, cellid2, and hydchr. The hydchr field must be set by the user, is set to NaN, and is provided for convenience. """ gu = GeoSpatialUtil(geom, "LineString") if gu.shapetype.lower() != "linestring": raise AssertionError( f"{gu.shapetype} is not supported, only LineStrings are supported" ) geom = gu.points if modelgrid.idomain is not None: idomain = modelgrid.idomain.ravel() else: idomain = np.ones((modelgrid.nnodes,), dtype=int) edge_set = _edge_neighbors(modelgrid) vert_ivert = {tuple(i): cnt for cnt, i in enumerate(modelgrid.verts)} iverts = modelgrid.iverts xverts, yverts = modelgrid.cross_section_vertices verts = [] for node, xv in enumerate(xverts): yv = yverts[node] vrts = list(zip(xv, yv)) if len(vrts) == len(iverts[node]): if iverts[node][0] != iverts[node][-1]: vrts.append(vrts[0]) verts.append(np.array(vrts)) ixs = GridIntersect(modelgrid) result = ixs.intersect(geom, shapetype="LineString") result_adj = [] for record in result: node = record.cellid ixshp = record.ixshapes coords = np.array(ixshp.coords.xy).T x0, y0 = coords[0, 0], coords[0, 1] x1, y1 = coords[-1, 0], coords[-1, 1] xycell = verts[node] xcell = xycell.T[0][:-1] ycell = xycell.T[1][:-1] vidx0 = _min_distance_index(xcell, x0, ycell, y0) if len(vidx0) > 1: # perturb line by small epsilon coords[0] = _perturb_intersection_coords(vidx0, xycell, coords[0]) x0, y0 = coords[0, 0], coords[0, 1] vidx0 = _min_distance_index(xcell, x0, ycell, y0) vidx0 = vidx0[0] vidx1 = _min_distance_index(xcell, x1, ycell, y1) if len(vidx1) > 1: # perturb line by small epsilon coords[-1] = _perturb_intersection_coords(vidx1, xycell, coords[-1]) x1, y1 = coords[-1, 0], coords[-1, 1] vidx1 = _min_distance_index(xcell, x1, ycell, y1) vidx1 = vidx1[0] if vidx0 == vidx1: continue tmp = tuple(sorted([vidx0, vidx1])) if tmp[1] - tmp[0] > 1: nvert = len(xycell) - 1 elens = _edge_length_lut(xycell) # construct line routing options o1 = list(range(tmp[0], tmp[1] + 1)) o2 = list(range(tmp[1], nvert)) + list(range(0, tmp[0] + 1)) # calculate routing distance d1 = np.sum( [elens[tuple(sorted([o1[ix - 1], o1[ix]]))] for ix in range(1, len(o1))] ) d2 = np.sum( [elens[tuple(sorted([o2[ix - 1], o2[ix]]))] for ix in range(1, len(o2))] ) # evaluate distance and break ties if necessary if d1 < d2: tmp = o1 elif d2 < d1: tmp = o2 else: om1 = _minimize_hfb_deviance(o1, xycell, coords) om2 = _minimize_hfb_deviance(o2, xycell, coords) if om1 <= om2: tmp = o1 else: tmp = o2 edges = [] for ix in range(1, len(tmp)): eix0 = tmp[ix - 1] eix1 = tmp[ix] xyv0 = tuple(xycell[eix0]) xyv1 = tuple(xycell[eix1]) iv0 = vert_ivert[xyv0] iv1 = vert_ivert[xyv1] edges.append(tuple(sorted([iv0, iv1]))) hfb_neighs = [] for edge in edges: for n in edge_set[edge]: if n == node: continue hfb_neighs.append(n) res = [int(record.cellid), hfb_neighs] result_adj.append(res) hfb_data = [] visited = [] if modelgrid.nlay is not None and modelgrid.grid_type != "unstructured": for lay in range(modelgrid.nlay): ncpl_adj = lay * modelgrid.ncpl for cid0, hfb_neighs in result_adj: if not idomain[cid0 + ncpl_adj]: continue if modelgrid.grid_type == "structured": cellid0 = modelgrid.get_lrc(cid0 + ncpl_adj)[0] else: cellid0 = (lay, cid0) for cid1 in hfb_neighs: if not idomain[cid1 + ncpl_adj]: continue if modelgrid.grid_type == "structured": cellid1 = modelgrid.get_lrc(cid1 + ncpl_adj)[0] else: cellid1 = (lay, cid1) if cellid0 + cellid1 in visited: continue elif cellid1 + cellid0 in visited: continue visited.append(cellid0 + cellid1) hfb_data.append((cellid0, cellid1)) else: for cellid0, hfb_neighs in result_adj: if not idomain[cellid0]: continue for cellid1 in hfb_neighs: if not idomain[cellid1]: continue if (cellid0, cellid1) in visited: continue elif (cellid1, cellid0) in visited: continue visited.append((cellid0, cellid1)) hfb_data.append(((cellid0,), (cellid1,))) df = pd.DataFrame(hfb_data, columns=["cellid1", "cellid2"]) df["hydchr"] = np.nan return df.to_records(index=False)