Source code for flopy.utils.postprocessing

import numpy as np


class _GridAdapter:
    """
    Temporary workaround to normalize grid-specific array access patterns.

    Different grid types (DIS, DISV, DISU) store arrays in different shapes.
    Adapters provide a uniformly shaped interface, (nlay, ncells_per_layer).

    Currently this is used only in get_transmissivities(). It should not be
    necessary anymore in 4.x.
    """

    def __init__(self, model, dis_package, nlay):
        self.model = model
        self.dis = dis_package
        self.nlay = nlay

    def reshape_if_needed(self, array, target_shape):
        """Reshape 1D array to 2D if needed."""
        if array.ndim == 1 and len(target_shape) == 2:
            return array.reshape(target_shape)
        return array

    def get_k_array(self, paklist):
        """
        Return hydraulic conductivity array in shape (nlay, ncells_per_layer).

        Parameters
        ----------
        paklist : list
            List of package names in the model
        """
        # Check for flow packages in order of preference
        if "LPF" in paklist:
            return self.model.lpf.hk.array
        elif "UPW" in paklist:
            return self.model.upw.hk.array
        elif "NPF" in paklist:
            return self._get_npf_k_array()
        else:
            raise ValueError("No LPF, UPW, or NPF package.")

    def _get_npf_k_array(self):
        """Get NPF k array. Subclasses may override for grid-specific handling."""
        return self.model.npf.k.array

    def get_bottom_array(self):
        """Return bottom elevation array in shape (nlay, ncells_per_layer)."""
        raise NotImplementedError

    def get_top_for_slice(self, indices, grid_type):
        """Return model top elevation for the given cell indices."""
        raise NotImplementedError

    def get_layer_tops(self, botm_sliced, indices, grid_type):
        """Return top elevation for each layer at the given indices."""
        raise NotImplementedError

    def normalize_heads_array(self, heads):
        """
        Normalize heads array to (nlay, ncells_per_layer) shape if needed.

        Parameters
        ----------
        heads : ndarray
            Heads array in any valid format for this grid type

        Returns
        -------
        ndarray
            Heads array in (nlay, ncells_per_layer) shape
        """
        # Default: no normalization needed
        return heads


class _DisAdapter(_GridAdapter):
    """Adapter for structured (DIS) grids."""

    def get_bottom_array(self):
        return self.dis.botm.array

    def get_top_for_slice(self, indices, grid_type):
        return self.dis.top.array[indices]

    def get_layer_tops(self, botm_sliced, indices, grid_type):
        tops = np.empty_like(botm_sliced, dtype=float)
        tops[0, :] = self.dis.top.array[indices]
        tops[1:, :] = botm_sliced[:-1]
        return tops


class _DisvAdapter(_GridAdapter):
    """Adapter for vertex (DISV) grids."""

    def get_bottom_array(self):
        return self.dis.botm.array

    def get_top_for_slice(self, indices, grid_type):
        # DISV top is (ncpl,), indices is a tuple with one element
        return self.dis.top.array[indices[0]]

    def get_layer_tops(self, botm_sliced, indices, grid_type):
        tops = np.empty_like(botm_sliced, dtype=float)
        tops[0, :] = self.dis.top.array[indices[0]]
        tops[1:, :] = botm_sliced[:-1]
        return tops


class _DisuAdapter(_GridAdapter):
    """Adapter for unstructured (DISU) grids."""

    def _get_npf_k_array(self):
        """Get NPF k array, reshaping from (nodes,) to (nlay, ncpl) if needed."""
        k_array = self.model.npf.k.array
        if k_array.ndim == 1:
            return k_array.reshape((self.nlay, -1))
        return k_array

    def get_bottom_array(self):
        bot_array = self.dis.bot.array
        if bot_array.ndim == 1:
            return bot_array.reshape((self.nlay, -1))
        return bot_array

    def get_top_for_slice(self, indices, grid_type):
        # DISU top is per-node (nodes,), reshape to (nlay, ncpl) and get layer 0
        top_array = self.dis.top.array
        if top_array.ndim == 1:
            top_array = top_array.reshape((self.nlay, -1))
        return top_array[0, indices[0]]

    def get_layer_tops(self, botm_sliced, indices, grid_type):
        # DISU has per-node tops, so each layer has different top values
        tops = np.empty_like(botm_sliced, dtype=float)
        top_array = self.dis.top.array
        if top_array.ndim == 1:
            top_array = top_array.reshape((self.nlay, -1))
        tops[0, :] = top_array[0, indices[0]]
        tops[1:, :] = botm_sliced[:-1]
        return tops

    def normalize_heads_array(self, heads):
        """Normalize heads from flat (nnodes,) to (nlay, ncpl) if needed."""
        if heads.ndim == 1:
            return heads.reshape((self.nlay, -1))
        return heads


def _get_grid_adapter(model, nlay):
    """
    Get the appropriate grid adapter for the model.

    Parameters
    ----------
    model : flopy.modflow.Modflow or flopy.mf6.ModflowGwf object
        Model object
    nlay : int
        Number of layers

    Returns
    -------
    adapter : _GridAdapter subclass instance
        Grid-specific adapter
    dis_package : discretization package
        The DIS, DISV, or DISU package
    """
    paklist = model.get_package_list()

    if "DISU" in paklist:
        return _DisuAdapter(model, model.disu, nlay)
    elif "DISV" in paklist:
        return _DisvAdapter(model, model.disv, nlay)
    elif "DIS" in paklist:
        return _DisAdapter(model, model.dis, nlay)
    else:
        # For older MODFLOW versions, default to DIS adapter
        return _DisAdapter(model, model.dis, nlay)


[docs]def get_transmissivities( heads, m, r=None, c=None, x=None, y=None, sctop=None, scbot=None, nodata=-999, ): """ Computes transmissivity in each model layer at specified locations and open intervals. A saturated thickness is determined for each row, column or x, y location supplied, based on the open interval (sctop, scbot), if supplied, otherwise the layer tops and bottoms and the water table are used. Parameters ---------- heads : 2D array OR 3D array numpy array of shape nlay by n locations (2D) OR complete heads array with the correct shape for structured grids (nlay, nrow, ncol) or for vertex grids (nlay, ncpl) or unstructured grids (nnodes). If None, the system is assumed to be fully saturated (heads at layer tops). m : flopy.modflow.Modflow or flopy.mf6.ModflowGwf object Must have dis, disv, or disu and lpf, upw, or npf packages. r : 1D array-like of ints, of length n locations row indices (optional; alternately specify x, y). Only valid for structured grids. c : 1D array-like of ints, of length n locations column indices (optional; alternately specify x, y). Only valid for structured grids. x : 1D array-like of floats, of length n locations x locations in real world coordinates (optional). y : 1D array-like of floats, of length n locations y locations in real world coordinates (optional). sctop : 1D array-like of floats, of length n locations open interval tops (optional; default is model top) scbot : 1D array-like of floats, of length n locations open interval bottoms (optional; default is model bottom) nodata : numeric optional; locations where heads=nodata will be assigned T=0 Returns ------- T : 2D array of same shape as heads (nlay x n locations) Transmissivities in each layer at each location Notes ----- For structured grids, locations can be specified with either: - r, c (row, column indices) - x, y (real world coordinates) For vertex and unstructured grids, only x, y coordinates are supported. When heads=None, the system is interpreted as fully saturated, with heads set to the model top for all layers. This represents maximum possible saturation and is useful for calculating maximum transmissivities. Examples -------- >>> T = get_transmissivities(heads, model, r=[0, 1], c=[0, 1]) >>> T = get_transmissivities(heads, model, x=[100.0, 200.0], y=[50.0, 150.0]) >>> T = get_transmissivities(None, model, r=[0, 1], c=[0, 1]) # fully saturated """ # get grid dims if (modelgrid := getattr(m, "modelgrid", None)) is not None: grid_type = modelgrid.grid_type nlay = m.modelgrid.nlay if grid_type == "structured": nrow = m.modelgrid.nrow ncol = m.modelgrid.ncol elif (ncpl := getattr(m.modelgrid, "ncpl", None)) is None: raise ValueError(f"Unsupported grid type: {grid_type}") else: grid_type = "structured" nlay = m.nlay nrow = m.nrow ncol = m.ncol # get grid adapter adapter = _get_grid_adapter(m, nlay) # get slicing indices if r is not None and c is not None: if grid_type != "structured": raise ValueError("r, c parameters only valid for structured grids") indices = (np.atleast_1d(r), np.atleast_1d(c)) elif x is not None and y is not None: points = zip(np.atleast_1d(x), np.atleast_1d(y)) if grid_type == "structured": indices = [m.modelgrid.intersect(xi, yi) for xi, yi in points] indices = tuple(np.array(idcs) for idcs in zip(*indices)) else: indices = (np.array([m.modelgrid.intersect(xi, yi) for xi, yi in points]),) else: raise ValueError("Must specify r, c indices or x, y locations.") # get k array using adapter (handles all flow packages) paklist = m.get_package_list() k_array = adapter.get_k_array(paklist) hk = k_array[(slice(None),) + indices] # get and slice bottom array botm_array = adapter.get_bottom_array() botm = botm_array[(slice(None),) + indices] # handle fully saturated case (heads=None) if heads is None: model_top = adapter.get_top_for_slice(indices, grid_type) heads = np.tile(model_top, (nlay, 1)) # normalize and slice heads using adapter heads = adapter.normalize_heads_array(heads) if grid_type == "structured" and heads.shape == (nlay, nrow, ncol): heads = heads[(slice(None),) + indices] elif grid_type != "structured" and heads.shape == (nlay, ncpl): heads = heads[(slice(None),) + indices] if heads.shape != botm.shape: raise ValueError("Shape of heads array must be nlay x nhyd") # open interval tops/bottoms default to model top/bottom if sctop is None: sctop = adapter.get_top_for_slice(indices, grid_type) if scbot is None: scbot = botm[-1, :] tops = adapter.get_layer_tops(botm, indices, grid_type) # expand top and bottom arrays to be same shape as botm, thickness, etc. # (so we have an open interval value for each layer) sctoparr = np.zeros(botm.shape) sctoparr[:] = sctop scbotarr = np.zeros(botm.shape) scbotarr[:] = scbot # start with layer tops # set tops above heads to heads # set tops above screen top to screen top # (we only care about the saturated open interval) openinvtop = tops.copy() openinvtop[openinvtop > heads] = heads[openinvtop > heads] openinvtop[openinvtop > sctoparr] = sctoparr[openinvtop > sctop] # start with layer bottoms # set bottoms below screened interval to screened interval bottom # set screen bottoms below bottoms to layer bottoms openinvbotm = botm.copy() openinvbotm[openinvbotm < scbotarr] = scbotarr[openinvbotm < scbot] openinvbotm[scbotarr < botm] = botm[scbotarr < botm] # compute thickness of open interval in each layer thick = openinvtop - openinvbotm # assign open intervals above or below model to closest cell in column not_in_layer = np.sum(thick < 0, axis=0) not_in_any_layer = not_in_layer == thick.shape[0] for i, n in enumerate(not_in_any_layer): if n: closest = np.argmax(thick[:, i]) thick[closest, i] = 1.0 thick[thick < 0] = 0 thick[heads == nodata] = 0 # exclude nodata cells # compute transmissivities T = thick * hk return T
[docs]def get_water_table(heads, hdry=-1e30, hnoflo=1e30, masked_values=None): """ Get a 2D array representing the water table elevation for each stress period in heads array. Parameters ---------- heads : 3 or 4-D np.ndarray Heads array. hdry : real The head that is assigned to cells that are converted to dry during a simulation. By default, -1e30. hnoflo : real The value of head assigned to all inactive (no flow) cells throughout the simulation, including vertical pass-through cells in MODFLOW 6. By default, 1e30. masked_values : list List of any values (in addition to hdry and hnoflo) that should be masked in the water table calculation. Returns ------- wt : 2 or 3-D np.ndarray of water table elevations for each stress period. """ heads = np.array(heads, ndmin=4) mask = (heads == hdry) | (heads == hnoflo) if masked_values is not None: for val in masked_values: mask = mask | (heads == val) k = (~mask).argmax(axis=1) per, i, j = np.indices(k.shape) wt = heads[per.ravel(), k.ravel(), i.ravel(), j.ravel()].reshape(k.shape) wt = np.squeeze(wt) mask = (wt == hdry) | (wt == hnoflo) if masked_values is not None: for val in masked_values: mask = mask | (wt == val) wt = np.ma.masked_array(wt, mask) return wt
[docs]def get_gradients(heads, m, nodata, per_idx=None): """ Calculates the hydraulic gradients from the heads array for each stress period. Parameters ---------- heads : 3 or 4-D np.ndarray Heads array. m : flopy.modflow.Modflow object Must have a flopy.modflow.ModflowDis object attached. nodata : real HDRY value indicating dry cells. per_idx : int or sequence of ints stress periods to return. If None, returns all stress periods (default). Returns ------- grad : 3 or 4-D np.ndarray Array of hydraulic gradients """ # internal calculations done on a masked array heads = np.ma.array(heads, ndmin=4, mask=heads == nodata) nper, nlay, nrow, ncol = heads.shape if per_idx is None: per_idx = list(range(nper)) elif np.isscalar(per_idx): per_idx = [per_idx] grad = [] for per in per_idx: hds = heads[per] zcnt_per = np.ma.array(m.dis.zcentroids, mask=hds.mask) unsat = zcnt_per > hds zcnt_per[unsat] = hds[unsat] # apply .diff on data and mask components separately diff_mask = np.diff(hds.mask, axis=0) dz = np.ma.array(np.diff(zcnt_per.data, axis=0), mask=diff_mask) dh = np.ma.array(np.diff(hds.data, axis=0), mask=diff_mask) # convert to nan-filled array, as is expected(!?) grad.append((dh / dz).filled(np.nan)) return np.squeeze(grad)
[docs]def get_extended_budget( cbcfile, precision="single", idx=None, kstpkper=None, totim=None, boundary_ifaces=None, hdsfile=None, model=None, ): """ Get the flow rate across cell faces including potential stresses applied along boundaries at a given time. Only implemented for "classical" MODFLOW versions where the budget is recorded as FLOW RIGHT FACE, FLOW FRONT FACE and FLOW LOWER FACE arrays. Parameters ---------- cbcfile : str Cell by cell file produced by Modflow. precision : str Binary file precision, default is 'single'. idx : int or list The zero-based record number. kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). The kstp and kper values are zero based. totim : float The simulation time. boundary_ifaces : dictionary {str: int or list} A dictionary defining how to treat stress flows at boundary cells. The keys are budget terms corresponding to stress packages (same term as in the overall volumetric budget printed in the listing file). The values are either a single iface number to be applied to all cells for the stress package, or a list of lists describing individual boundary cells in the same way as in the package input plus the iface number appended. The iface number indicates the face to which the stress flow is assigned, following the MODPATH convention (see MODPATH user guide). Example: boundary_ifaces = { 'RECHARGE': 6, 'RIVER LEAKAGE': 6, 'CONSTANT HEAD': [[lay, row, col, iface], ...], 'WELLS': [[lay, row, col, flux, iface], ...], 'HEAD DEP BOUNDS': [[lay, row, col, head, cond, iface], ...]}. Note: stresses that are not informed in boundary_ifaces are implicitly treated as internally-distributed sinks/sources. hdsfile : str Head file produced by MODFLOW (only required if boundary_ifaces is used). model : flopy.modflow.Modflow object Modflow model instance (only required if boundary_ifaces is used). Returns ------- (Qx_ext, Qy_ext, Qz_ext) : tuple Flow rates across cell faces. Qx_ext is a ndarray of size (nlay, nrow, ncol + 1). Qy_ext is a ndarray of size (nlay, nrow + 1, ncol). The sign is such that the y axis is considered to increase in the north direction. Qz_ext is a ndarray of size (nlay + 1, nrow, ncol). The sign is such that the z axis is considered to increase in the upward direction. """ import flopy.utils.binaryfile as bf import flopy.utils.formattedfile as fm # define useful stuff if isinstance(cbcfile, bf.CellBudgetFile): cbf = cbcfile else: cbf = bf.CellBudgetFile(cbcfile) nlay, nrow, ncol = cbf.nlay, cbf.nrow, cbf.ncol rec_names = cbf.get_unique_record_names(decode=True) err_msg = " not found in the budget file." # get flow across right face Qx_ext = np.zeros((nlay, nrow, ncol + 1), dtype=np.float32) if ncol > 1: budget_term = "FLOW RIGHT FACE" matched_name = [s for s in rec_names if budget_term in s] if not matched_name: raise RuntimeError(budget_term + err_msg) frf = cbf.get_data(idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term) Qx_ext[:, :, 1:] = frf[0] # SWI2 package budget_term_swi = "SWIADDTOFRF" matched_name_swi = [s for s in rec_names if budget_term_swi in s] if matched_name_swi: frf_swi = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term_swi ) Qx_ext[:, :, 1:] += frf_swi[0] # get flow across front face Qy_ext = np.zeros((nlay, nrow + 1, ncol), dtype=np.float32) if nrow > 1: budget_term = "FLOW FRONT FACE" matched_name = [s for s in rec_names if budget_term in s] if not matched_name: raise RuntimeError(budget_term + err_msg) fff = cbf.get_data(idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term) Qy_ext[:, 1:, :] = -fff[0] # SWI2 package budget_term_swi = "SWIADDTOFFF" matched_name_swi = [s for s in rec_names if budget_term_swi in s] if matched_name_swi: fff_swi = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term_swi ) Qy_ext[:, 1:, :] -= fff_swi[0] # get flow across lower face Qz_ext = np.zeros((nlay + 1, nrow, ncol), dtype=np.float32) if nlay > 1: budget_term = "FLOW LOWER FACE" matched_name = [s for s in rec_names if budget_term in s] if not matched_name: raise RuntimeError(budget_term + err_msg) flf = cbf.get_data(idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term) Qz_ext[1:, :, :] = -flf[0] # SWI2 package budget_term_swi = "SWIADDTOFLF" matched_name_swi = [s for s in rec_names if budget_term_swi in s] if matched_name_swi: flf_swi = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term_swi ) Qz_ext[1:, :, :] -= flf_swi[0] # deal with boundary cells if boundary_ifaces is not None: # need calculated heads for some stresses and to check hnoflo and hdry if hdsfile is None: raise ValueError("hdsfile must be provided when using boundary_ifaces") if isinstance(hdsfile, (bf.HeadFile, fm.FormattedHeadFile)): hds = hdsfile else: try: hds = bf.HeadFile(hdsfile) except: hds = fm.FormattedHeadFile(hdsfile, precision=precision) head = hds.get_data(idx=idx, kstpkper=kstpkper, totim=totim) # get hnoflo and hdry values if model is None: raise ValueError("model must be provided when using boundary_ifaces") noflo_or_dry = np.logical_or(head == model.hnoflo, head == model.hdry) for budget_term, iface_info in boundary_ifaces.items(): # look for budget term in budget file matched_name = [s for s in rec_names if budget_term in s] if not matched_name: raise RuntimeError( f'Budget term {budget_term} not found in "{cbcfile}" file.' ) if len(matched_name) > 1: raise RuntimeError( "Budget term " + budget_term + " found" " in several record names. Use a more " " precise name." ) Q_stress = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=matched_name[0], full3D=True, )[0] # remove potential leading and trailing spaces budget_term = budget_term.strip() # weirdly, MODFLOW puts recharge in all potential recharge cells # and not only the actual cells; thus, correct this by putting 0 # away from water table cells if budget_term == "RECHARGE": # find the water table as the first active cell in each column water_table = np.full((nlay, nrow, ncol), False) water_table[0, :, :] = np.logical_not(noflo_or_dry[0, :, :]) already_found = water_table[0, :, :] for lay in range(1, nlay): if np.sum(already_found) == nrow * ncol: break water_table[lay, :, :] = np.logical_and( np.logical_not(noflo_or_dry[lay, :, :]), np.logical_not(already_found), ) already_found = np.logical_or(already_found, water_table[lay, :, :]) Q_stress[np.logical_not(water_table)] = 0.0 # case where the same iface is assigned to all cells if isinstance(iface_info, int): if iface_info == 1: Qx_ext[:, :, :-1] += Q_stress elif iface_info == 2: Qx_ext[:, :, 1:] -= Q_stress elif iface_info == 3: Qy_ext[:, 1:, :] += Q_stress elif iface_info == 4: Qy_ext[:, :-1, :] -= Q_stress elif iface_info == 5: Qz_ext[1:, :, :] += Q_stress elif iface_info == 6: Qz_ext[:-1, :, :] -= Q_stress # case where iface is assigned individually per cell elif isinstance(iface_info, list): # impose a unique iface (normally = 6) for some stresses # (note: UZF RECHARGE, GW ET and SURFACE LEAKAGE are all # related to the UZF package) if ( budget_term == "RECHARGE" or budget_term == "ET" or budget_term == "UZF RECHARGE" or budget_term == "GW ET" or budget_term == "SURFACE LEAKAGE" ): raise ValueError( "This function imposes the use of a " "unique iface (normally = 6) for the " "{} budget term.".format(budget_term) ) # loop through boundary cells for cell_info in iface_info: lay, row, col = cell_info[0], cell_info[1], cell_info[2] if noflo_or_dry[lay, row, col]: continue iface = cell_info[-1] # Here, where appropriate, we recalculate Q_stress_cell # using package input. This gives more flexibility than # directly taking the value saved by MODFLOW. Indeed, it # allows for a same type of stress to be applied several # times to the same cell but to different faces # (whereas MODFLOW only saves one lumped value per # stress type per cell). # Note: this flexibility is not supported for: # - FHB package (we would need to interpolate inputs # across time steps as done in the package; complicated) # - RES package (we would need to interpolate inputs # across time steps as done in the package; complicated) # - STR package (we would need first to retrieve river # stage from model outputs; complicated) # - SFR1 package (we would need to retrieve river # stage and conductance from model outputs; complicated) # - SFR2 package (even more complicated than SFR1) # - LAK3 package (we would need to retrieve lake # stage and conductance from model outputs; complicated) # - MNW1 package (we would need to retrieve well head and # conductance from model outputs; complicated) # - MNW2 package (even more complicated than MNW1) if budget_term == "WELLS": Q_stress_cell = cell_info[3] elif budget_term == "HEAD DEP BOUNDS": ghb_head = cell_info[3] ghb_cond = cell_info[4] model_head = head[lay, row, col] Q_stress_cell = ghb_cond * (ghb_head - model_head) elif budget_term == "RIVER LEAKAGE": riv_stage = cell_info[3] riv_cond = cell_info[4] riv_rbot = cell_info[5] model_head = head[lay, row, col] if model_head > riv_rbot: Q_stress_cell = riv_cond * (riv_stage - model_head) else: Q_stress_cell = riv_cond * (riv_stage - riv_rbot) elif budget_term == "DRAINS": drn_stage = cell_info[3] drn_cond = cell_info[4] model_head = head[lay, row, col] if model_head > drn_stage: Q_stress_cell = drn_cond * (drn_stage - model_head) else: continue # Else, take the value saved by MODFLOW. # This includes the budget terms: # - 'CONSTANT HEAD' for: # * head specified through -1 in IBOUND # * head specified in CHD package # * head specified in FHB package # - 'SPECIFIED FLOWS' for flow specified in FHB package # - 'RESERV. LEAKAGE' for RES package # - 'STREAM LEAKAGE' for: # * STR package # * SFR1 package # * SFR2 package # - 'LAKE SEEPAGE' for LAK3 package # - 'MNW' for MNW1 package # - 'MNW2' for MNW2 package # - 'SWIADDTOCH' for SWI2 package else: Q_stress_cell = Q_stress[lay, row, col] if iface == 1: Qx_ext[lay, row, col] += Q_stress_cell elif iface == 2: Qx_ext[lay, row, col + 1] -= Q_stress_cell elif iface == 3: Qy_ext[lay, row + 1, col] += Q_stress_cell elif iface == 4: Qy_ext[lay, row, col] -= Q_stress_cell elif iface == 5: Qz_ext[lay + 1, row, col] += Q_stress_cell elif iface == 6: Qz_ext[lay, row, col] -= Q_stress_cell else: raise TypeError("boundary_ifaces value must be either int or list.") return Qx_ext, Qy_ext, Qz_ext
[docs]def get_specific_discharge( vectors, model, head=None, position="centers", ): """ Get the discharge vector at cell centers at a given time. For "classical" MODFLOW versions, we calculate it from the flow rate across cell faces. For MODFLOW 6, we directly take it from MODFLOW output (this requires setting the option "save_specific_discharge" in the NPF package). Parameters ---------- vectors : tuple, np.recarray either a tuple of (flow right face, flow front face, flow lower face) numpy arrays from a MODFLOW-2005 compatible Cell Budget File or a specific discharge recarray from a MODFLOW 6 Cell Budget File model : object flopy model object head : np.ndarray numpy array of head values for a specific model position : str Position at which the specific discharge will be calculated. Possible values are "centers" (default), "faces" and "vertices". Returns ------- (qx, qy, qz) : tuple Discharge vector. qx, qy, qz are ndarrays of size (nlay, nrow, ncol) for a structured grid or size (nlay, ncpl) for an unstructured grid. The sign of qy is such that the y axis is considered to increase in the north direction. The sign of qz is such that the z axis is considered to increase in the upward direction. Note: if a head array is provided, inactive and dry cells are set to NaN. """ classical_budget = False spdis, tqx, tqy, tqz = None, None, None, None modelgrid = model.modelgrid if head is not None: head.shape = modelgrid.shape if isinstance(vectors, (list, tuple)): classical_budget = True for ix, vector in enumerate(vectors): if vector is None: continue else: tshp = list(modelgrid.shape)[::-1] tshp[ix] += 1 ext_shape = tuple(tshp[::-1]) break if vectors[ix].shape == modelgrid.shape: tqx = np.zeros( (modelgrid.nlay, modelgrid.nrow, modelgrid.ncol + 1), dtype=np.float32 ) tqy = np.zeros( (modelgrid.nlay, modelgrid.nrow + 1, modelgrid.ncol), dtype=np.float32 ) tqz = np.zeros( (modelgrid.nlay + 1, modelgrid.nrow, modelgrid.ncol), dtype=np.float32 ) if vectors[0] is not None: tqx[:, :, 1:] = vectors[0] if vectors[1] is not None: tqy[:, 1:, :] = -vectors[1] if len(vectors) > 2 and vectors[2] is not None: tqz[1:, :, :] = -vectors[2] elif vectors[ix].shape == ext_shape: if vectors[0] is not None: tqx = vectors[0] if vectors[1] is not None: tqy = vectors[1] if len(vectors) > 2 and vectors[2] is not None: tqz = vectors[2] else: raise IndexError( "Classical budget components must have the same shape as the modelgrid" ) else: spdis = vectors if classical_budget: # get saturated thickness (head - bottom elev for unconfined layer) if head is None: saturated_thickness = modelgrid.remove_confining_beds( modelgrid.cell_thickness ) else: saturated_thickness = modelgrid.saturated_thickness( head, mask=[model.hdry, model.hnoflo] ) saturated_thickness.shape = modelgrid.shape # inform modelgrid of no-flow and dry cells modelgrid = model.modelgrid if modelgrid._idomain is None: modelgrid._idomain = model.dis.ibound if head is not None: noflo_or_dry = np.logical_or(head == model.hnoflo, head == model.hdry) modelgrid._idomain[noflo_or_dry] = 0 # get cross section areas along x delc = np.reshape(modelgrid.delc, (1, modelgrid.nrow, 1)) cross_area_x = delc * saturated_thickness # get cross section areas along y delr = np.reshape(modelgrid.delr, (1, 1, modelgrid.ncol)) cross_area_y = delr * saturated_thickness # get cross section areas along z cross_area_z = np.ones(modelgrid.shape) * delc * delr # calculate qx, qy, qz if position == "centers": qx = np.zeros(modelgrid.shape, dtype=np.float32) qy = np.zeros(modelgrid.shape, dtype=np.float32) cross_area_x = ( delc[:] * 0.5 * (saturated_thickness[:, :, :-1] + saturated_thickness[:, :, 1:]) ) cross_area_y = ( delr * 0.5 * (saturated_thickness[:, 1:, :] + saturated_thickness[:, :-1, :]) ) qx[:, :, 1:] = 0.5 * (tqx[:, :, 2:] + tqx[:, :, 1:-1]) / cross_area_x qx[:, :, 0] = 0.5 * tqx[:, :, 1] / cross_area_x[:, :, 0] qy[:, 1:, :] = 0.5 * (tqy[:, 2:, :] + tqy[:, 1:-1, :]) / cross_area_y qy[:, 0, :] = 0.5 * tqy[:, 1, :] / cross_area_y[:, 0, :] qz = 0.5 * (tqz[1:, :, :] + tqz[:-1, :, :]) / cross_area_z elif position == "faces" or position == "vertices": cross_area_x = modelgrid.array_at_faces(cross_area_x, "x") cross_area_y = modelgrid.array_at_faces(cross_area_y, "y") cross_area_z = modelgrid.array_at_faces(cross_area_z, "z") qx = tqx / cross_area_x qy = tqy / cross_area_y qz = tqz / cross_area_z else: raise ValueError(f'"{position}" is not a valid value for position') if position == "vertices": qx = modelgrid.array_at_verts(qx) qy = modelgrid.array_at_verts(qy) qz = modelgrid.array_at_verts(qz) else: if position != "centers": raise ValueError( f"MF6 vectors cannot be calculated at {position}, 'centers' " f"is the only supported option" ) nnodes = model.modelgrid.nnodes qx = np.full((nnodes), np.nan, dtype=np.float64) qy = np.full((nnodes), np.nan, dtype=np.float64) qz = np.full((nnodes), np.nan, dtype=np.float64) idx = np.array(spdis["node"]) - 1 qx[idx] = spdis["qx"] qy[idx] = spdis["qy"] qz[idx] = spdis["qz"] qx.shape = modelgrid.shape qy.shape = modelgrid.shape qz.shape = modelgrid.shape # set no-flow and dry cells to NaN if head is not None and position == "centers": noflo_or_dry = np.logical_or(head == model.hnoflo, head == model.hdry) qx[noflo_or_dry] = np.nan qy[noflo_or_dry] = np.nan qz[noflo_or_dry] = np.nan return qx, qy, qz