import numpy as np
class _GridAdapter:
"""
Temporary workaround to normalize grid-specific array access patterns.
Different grid types (DIS, DISV, DISU) store arrays in different shapes.
Adapters provide a uniformly shaped interface, (nlay, ncells_per_layer).
Currently this is used only in get_transmissivities(). It should not be
necessary anymore in 4.x.
"""
def __init__(self, model, dis_package, nlay):
self.model = model
self.dis = dis_package
self.nlay = nlay
def reshape_if_needed(self, array, target_shape):
"""Reshape 1D array to 2D if needed."""
if array.ndim == 1 and len(target_shape) == 2:
return array.reshape(target_shape)
return array
def get_k_array(self, paklist):
"""
Return hydraulic conductivity array in shape (nlay, ncells_per_layer).
Parameters
----------
paklist : list
List of package names in the model
"""
# Check for flow packages in order of preference
if "LPF" in paklist:
return self.model.lpf.hk.array
elif "UPW" in paklist:
return self.model.upw.hk.array
elif "NPF" in paklist:
return self._get_npf_k_array()
else:
raise ValueError("No LPF, UPW, or NPF package.")
def _get_npf_k_array(self):
"""Get NPF k array. Subclasses may override for grid-specific handling."""
return self.model.npf.k.array
def get_bottom_array(self):
"""Return bottom elevation array in shape (nlay, ncells_per_layer)."""
raise NotImplementedError
def get_top_for_slice(self, indices, grid_type):
"""Return model top elevation for the given cell indices."""
raise NotImplementedError
def get_layer_tops(self, botm_sliced, indices, grid_type):
"""Return top elevation for each layer at the given indices."""
raise NotImplementedError
def normalize_heads_array(self, heads):
"""
Normalize heads array to (nlay, ncells_per_layer) shape if needed.
Parameters
----------
heads : ndarray
Heads array in any valid format for this grid type
Returns
-------
ndarray
Heads array in (nlay, ncells_per_layer) shape
"""
# Default: no normalization needed
return heads
class _DisAdapter(_GridAdapter):
"""Adapter for structured (DIS) grids."""
def get_bottom_array(self):
return self.dis.botm.array
def get_top_for_slice(self, indices, grid_type):
return self.dis.top.array[indices]
def get_layer_tops(self, botm_sliced, indices, grid_type):
tops = np.empty_like(botm_sliced, dtype=float)
tops[0, :] = self.dis.top.array[indices]
tops[1:, :] = botm_sliced[:-1]
return tops
class _DisvAdapter(_GridAdapter):
"""Adapter for vertex (DISV) grids."""
def get_bottom_array(self):
return self.dis.botm.array
def get_top_for_slice(self, indices, grid_type):
# DISV top is (ncpl,), indices is a tuple with one element
return self.dis.top.array[indices[0]]
def get_layer_tops(self, botm_sliced, indices, grid_type):
tops = np.empty_like(botm_sliced, dtype=float)
tops[0, :] = self.dis.top.array[indices[0]]
tops[1:, :] = botm_sliced[:-1]
return tops
class _DisuAdapter(_GridAdapter):
"""Adapter for unstructured (DISU) grids."""
def _get_npf_k_array(self):
"""Get NPF k array, reshaping from (nodes,) to (nlay, ncpl) if needed."""
k_array = self.model.npf.k.array
if k_array.ndim == 1:
return k_array.reshape((self.nlay, -1))
return k_array
def get_bottom_array(self):
bot_array = self.dis.bot.array
if bot_array.ndim == 1:
return bot_array.reshape((self.nlay, -1))
return bot_array
def get_top_for_slice(self, indices, grid_type):
# DISU top is per-node (nodes,), reshape to (nlay, ncpl) and get layer 0
top_array = self.dis.top.array
if top_array.ndim == 1:
top_array = top_array.reshape((self.nlay, -1))
return top_array[0, indices[0]]
def get_layer_tops(self, botm_sliced, indices, grid_type):
# DISU has per-node tops, so each layer has different top values
tops = np.empty_like(botm_sliced, dtype=float)
top_array = self.dis.top.array
if top_array.ndim == 1:
top_array = top_array.reshape((self.nlay, -1))
tops[0, :] = top_array[0, indices[0]]
tops[1:, :] = botm_sliced[:-1]
return tops
def normalize_heads_array(self, heads):
"""Normalize heads from flat (nnodes,) to (nlay, ncpl) if needed."""
if heads.ndim == 1:
return heads.reshape((self.nlay, -1))
return heads
def _get_grid_adapter(model, nlay):
"""
Get the appropriate grid adapter for the model.
Parameters
----------
model : flopy.modflow.Modflow or flopy.mf6.ModflowGwf object
Model object
nlay : int
Number of layers
Returns
-------
adapter : _GridAdapter subclass instance
Grid-specific adapter
dis_package : discretization package
The DIS, DISV, or DISU package
"""
paklist = model.get_package_list()
if "DISU" in paklist:
return _DisuAdapter(model, model.disu, nlay)
elif "DISV" in paklist:
return _DisvAdapter(model, model.disv, nlay)
elif "DIS" in paklist:
return _DisAdapter(model, model.dis, nlay)
else:
# For older MODFLOW versions, default to DIS adapter
return _DisAdapter(model, model.dis, nlay)
[docs]def get_transmissivities(
heads,
m,
r=None,
c=None,
x=None,
y=None,
sctop=None,
scbot=None,
nodata=-999,
):
"""
Computes transmissivity in each model layer at specified locations and
open intervals. A saturated thickness is determined for each row, column
or x, y location supplied, based on the open interval (sctop, scbot),
if supplied, otherwise the layer tops and bottoms and the water table
are used.
Parameters
----------
heads : 2D array OR 3D array
numpy array of shape nlay by n locations (2D) OR complete heads array
with the correct shape for structured grids (nlay, nrow, ncol) or for
vertex grids (nlay, ncpl) or unstructured grids (nnodes).
If None, the system is assumed to be fully saturated (heads at layer tops).
m : flopy.modflow.Modflow or flopy.mf6.ModflowGwf object
Must have dis, disv, or disu and lpf, upw, or npf packages.
r : 1D array-like of ints, of length n locations
row indices (optional; alternately specify x, y).
Only valid for structured grids.
c : 1D array-like of ints, of length n locations
column indices (optional; alternately specify x, y).
Only valid for structured grids.
x : 1D array-like of floats, of length n locations
x locations in real world coordinates (optional).
y : 1D array-like of floats, of length n locations
y locations in real world coordinates (optional).
sctop : 1D array-like of floats, of length n locations
open interval tops (optional; default is model top)
scbot : 1D array-like of floats, of length n locations
open interval bottoms (optional; default is model bottom)
nodata : numeric
optional; locations where heads=nodata will be assigned T=0
Returns
-------
T : 2D array of same shape as heads (nlay x n locations)
Transmissivities in each layer at each location
Notes
-----
For structured grids, locations can be specified with either:
- r, c (row, column indices)
- x, y (real world coordinates)
For vertex and unstructured grids, only x, y coordinates are supported.
When heads=None, the system is interpreted as fully saturated, with heads
set to the model top for all layers. This represents maximum possible
saturation and is useful for calculating maximum transmissivities.
Examples
--------
>>> T = get_transmissivities(heads, model, r=[0, 1], c=[0, 1])
>>> T = get_transmissivities(heads, model, x=[100.0, 200.0], y=[50.0, 150.0])
>>> T = get_transmissivities(None, model, r=[0, 1], c=[0, 1]) # fully saturated
"""
# get grid dims
if (modelgrid := getattr(m, "modelgrid", None)) is not None:
grid_type = modelgrid.grid_type
nlay = m.modelgrid.nlay
if grid_type == "structured":
nrow = m.modelgrid.nrow
ncol = m.modelgrid.ncol
elif (ncpl := getattr(m.modelgrid, "ncpl", None)) is None:
raise ValueError(f"Unsupported grid type: {grid_type}")
else:
grid_type = "structured"
nlay = m.nlay
nrow = m.nrow
ncol = m.ncol
# get grid adapter
adapter = _get_grid_adapter(m, nlay)
# get slicing indices
if r is not None and c is not None:
if grid_type != "structured":
raise ValueError("r, c parameters only valid for structured grids")
indices = (np.atleast_1d(r), np.atleast_1d(c))
elif x is not None and y is not None:
points = zip(np.atleast_1d(x), np.atleast_1d(y))
if grid_type == "structured":
indices = [m.modelgrid.intersect(xi, yi) for xi, yi in points]
indices = tuple(np.array(idcs) for idcs in zip(*indices))
else:
indices = (np.array([m.modelgrid.intersect(xi, yi) for xi, yi in points]),)
else:
raise ValueError("Must specify r, c indices or x, y locations.")
# get k array using adapter (handles all flow packages)
paklist = m.get_package_list()
k_array = adapter.get_k_array(paklist)
hk = k_array[(slice(None),) + indices]
# get and slice bottom array
botm_array = adapter.get_bottom_array()
botm = botm_array[(slice(None),) + indices]
# handle fully saturated case (heads=None)
if heads is None:
model_top = adapter.get_top_for_slice(indices, grid_type)
heads = np.tile(model_top, (nlay, 1))
# normalize and slice heads using adapter
heads = adapter.normalize_heads_array(heads)
if grid_type == "structured" and heads.shape == (nlay, nrow, ncol):
heads = heads[(slice(None),) + indices]
elif grid_type != "structured" and heads.shape == (nlay, ncpl):
heads = heads[(slice(None),) + indices]
if heads.shape != botm.shape:
raise ValueError("Shape of heads array must be nlay x nhyd")
# open interval tops/bottoms default to model top/bottom
if sctop is None:
sctop = adapter.get_top_for_slice(indices, grid_type)
if scbot is None:
scbot = botm[-1, :]
tops = adapter.get_layer_tops(botm, indices, grid_type)
# expand top and bottom arrays to be same shape as botm, thickness, etc.
# (so we have an open interval value for each layer)
sctoparr = np.zeros(botm.shape)
sctoparr[:] = sctop
scbotarr = np.zeros(botm.shape)
scbotarr[:] = scbot
# start with layer tops
# set tops above heads to heads
# set tops above screen top to screen top
# (we only care about the saturated open interval)
openinvtop = tops.copy()
openinvtop[openinvtop > heads] = heads[openinvtop > heads]
openinvtop[openinvtop > sctoparr] = sctoparr[openinvtop > sctop]
# start with layer bottoms
# set bottoms below screened interval to screened interval bottom
# set screen bottoms below bottoms to layer bottoms
openinvbotm = botm.copy()
openinvbotm[openinvbotm < scbotarr] = scbotarr[openinvbotm < scbot]
openinvbotm[scbotarr < botm] = botm[scbotarr < botm]
# compute thickness of open interval in each layer
thick = openinvtop - openinvbotm
# assign open intervals above or below model to closest cell in column
not_in_layer = np.sum(thick < 0, axis=0)
not_in_any_layer = not_in_layer == thick.shape[0]
for i, n in enumerate(not_in_any_layer):
if n:
closest = np.argmax(thick[:, i])
thick[closest, i] = 1.0
thick[thick < 0] = 0
thick[heads == nodata] = 0 # exclude nodata cells
# compute transmissivities
T = thick * hk
return T
[docs]def get_water_table(heads, hdry=-1e30, hnoflo=1e30, masked_values=None):
"""
Get a 2D array representing the water table elevation for each
stress period in heads array.
Parameters
----------
heads : 3 or 4-D np.ndarray
Heads array.
hdry : real
The head that is assigned to cells that are converted to dry
during a simulation. By default, -1e30.
hnoflo : real
The value of head assigned to all inactive (no flow) cells
throughout the simulation, including vertical pass-through cells in
MODFLOW 6. By default, 1e30.
masked_values : list
List of any values (in addition to hdry and hnoflo) that should
be masked in the water table calculation.
Returns
-------
wt : 2 or 3-D np.ndarray of water table elevations
for each stress period.
"""
heads = np.array(heads, ndmin=4)
mask = (heads == hdry) | (heads == hnoflo)
if masked_values is not None:
for val in masked_values:
mask = mask | (heads == val)
k = (~mask).argmax(axis=1)
per, i, j = np.indices(k.shape)
wt = heads[per.ravel(), k.ravel(), i.ravel(), j.ravel()].reshape(k.shape)
wt = np.squeeze(wt)
mask = (wt == hdry) | (wt == hnoflo)
if masked_values is not None:
for val in masked_values:
mask = mask | (wt == val)
wt = np.ma.masked_array(wt, mask)
return wt
[docs]def get_gradients(heads, m, nodata, per_idx=None):
"""
Calculates the hydraulic gradients from the heads
array for each stress period.
Parameters
----------
heads : 3 or 4-D np.ndarray
Heads array.
m : flopy.modflow.Modflow object
Must have a flopy.modflow.ModflowDis object attached.
nodata : real
HDRY value indicating dry cells.
per_idx : int or sequence of ints
stress periods to return. If None,
returns all stress periods (default).
Returns
-------
grad : 3 or 4-D np.ndarray
Array of hydraulic gradients
"""
# internal calculations done on a masked array
heads = np.ma.array(heads, ndmin=4, mask=heads == nodata)
nper, nlay, nrow, ncol = heads.shape
if per_idx is None:
per_idx = list(range(nper))
elif np.isscalar(per_idx):
per_idx = [per_idx]
grad = []
for per in per_idx:
hds = heads[per]
zcnt_per = np.ma.array(m.dis.zcentroids, mask=hds.mask)
unsat = zcnt_per > hds
zcnt_per[unsat] = hds[unsat]
# apply .diff on data and mask components separately
diff_mask = np.diff(hds.mask, axis=0)
dz = np.ma.array(np.diff(zcnt_per.data, axis=0), mask=diff_mask)
dh = np.ma.array(np.diff(hds.data, axis=0), mask=diff_mask)
# convert to nan-filled array, as is expected(!?)
grad.append((dh / dz).filled(np.nan))
return np.squeeze(grad)
[docs]def get_extended_budget(
cbcfile,
precision="single",
idx=None,
kstpkper=None,
totim=None,
boundary_ifaces=None,
hdsfile=None,
model=None,
):
"""
Get the flow rate across cell faces including potential stresses applied
along boundaries at a given time. Only implemented for "classical" MODFLOW
versions where the budget is recorded as FLOW RIGHT FACE, FLOW FRONT FACE
and FLOW LOWER FACE arrays.
Parameters
----------
cbcfile : str
Cell by cell file produced by Modflow.
precision : str
Binary file precision, default is 'single'.
idx : int or list
The zero-based record number.
kstpkper : tuple of ints
A tuple containing the time step and stress period (kstp, kper).
The kstp and kper values are zero based.
totim : float
The simulation time.
boundary_ifaces : dictionary {str: int or list}
A dictionary defining how to treat stress flows at boundary cells.
The keys are budget terms corresponding to stress packages (same term
as in the overall volumetric budget printed in the listing file).
The values are either a single iface number to be applied to all cells
for the stress package, or a list of lists describing individual
boundary cells in the same way as in the package input plus the iface
number appended. The iface number indicates the face to which the
stress flow is assigned, following the MODPATH convention (see MODPATH
user guide).
Example:
boundary_ifaces = {
'RECHARGE': 6,
'RIVER LEAKAGE': 6,
'CONSTANT HEAD': [[lay, row, col, iface], ...],
'WELLS': [[lay, row, col, flux, iface], ...],
'HEAD DEP BOUNDS': [[lay, row, col, head, cond, iface], ...]}.
Note: stresses that are not informed in boundary_ifaces are implicitly
treated as internally-distributed sinks/sources.
hdsfile : str
Head file produced by MODFLOW (only required if boundary_ifaces is
used).
model : flopy.modflow.Modflow object
Modflow model instance (only required if boundary_ifaces is used).
Returns
-------
(Qx_ext, Qy_ext, Qz_ext) : tuple
Flow rates across cell faces.
Qx_ext is a ndarray of size (nlay, nrow, ncol + 1).
Qy_ext is a ndarray of size (nlay, nrow + 1, ncol). The sign is such
that the y axis is considered to increase in the north direction.
Qz_ext is a ndarray of size (nlay + 1, nrow, ncol). The sign is such
that the z axis is considered to increase in the upward direction.
"""
import flopy.utils.binaryfile as bf
import flopy.utils.formattedfile as fm
# define useful stuff
if isinstance(cbcfile, bf.CellBudgetFile):
cbf = cbcfile
else:
cbf = bf.CellBudgetFile(cbcfile)
nlay, nrow, ncol = cbf.nlay, cbf.nrow, cbf.ncol
rec_names = cbf.get_unique_record_names(decode=True)
err_msg = " not found in the budget file."
# get flow across right face
Qx_ext = np.zeros((nlay, nrow, ncol + 1), dtype=np.float32)
if ncol > 1:
budget_term = "FLOW RIGHT FACE"
matched_name = [s for s in rec_names if budget_term in s]
if not matched_name:
raise RuntimeError(budget_term + err_msg)
frf = cbf.get_data(idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term)
Qx_ext[:, :, 1:] = frf[0]
# SWI2 package
budget_term_swi = "SWIADDTOFRF"
matched_name_swi = [s for s in rec_names if budget_term_swi in s]
if matched_name_swi:
frf_swi = cbf.get_data(
idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term_swi
)
Qx_ext[:, :, 1:] += frf_swi[0]
# get flow across front face
Qy_ext = np.zeros((nlay, nrow + 1, ncol), dtype=np.float32)
if nrow > 1:
budget_term = "FLOW FRONT FACE"
matched_name = [s for s in rec_names if budget_term in s]
if not matched_name:
raise RuntimeError(budget_term + err_msg)
fff = cbf.get_data(idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term)
Qy_ext[:, 1:, :] = -fff[0]
# SWI2 package
budget_term_swi = "SWIADDTOFFF"
matched_name_swi = [s for s in rec_names if budget_term_swi in s]
if matched_name_swi:
fff_swi = cbf.get_data(
idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term_swi
)
Qy_ext[:, 1:, :] -= fff_swi[0]
# get flow across lower face
Qz_ext = np.zeros((nlay + 1, nrow, ncol), dtype=np.float32)
if nlay > 1:
budget_term = "FLOW LOWER FACE"
matched_name = [s for s in rec_names if budget_term in s]
if not matched_name:
raise RuntimeError(budget_term + err_msg)
flf = cbf.get_data(idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term)
Qz_ext[1:, :, :] = -flf[0]
# SWI2 package
budget_term_swi = "SWIADDTOFLF"
matched_name_swi = [s for s in rec_names if budget_term_swi in s]
if matched_name_swi:
flf_swi = cbf.get_data(
idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term_swi
)
Qz_ext[1:, :, :] -= flf_swi[0]
# deal with boundary cells
if boundary_ifaces is not None:
# need calculated heads for some stresses and to check hnoflo and hdry
if hdsfile is None:
raise ValueError("hdsfile must be provided when using boundary_ifaces")
if isinstance(hdsfile, (bf.HeadFile, fm.FormattedHeadFile)):
hds = hdsfile
else:
try:
hds = bf.HeadFile(hdsfile)
except:
hds = fm.FormattedHeadFile(hdsfile, precision=precision)
head = hds.get_data(idx=idx, kstpkper=kstpkper, totim=totim)
# get hnoflo and hdry values
if model is None:
raise ValueError("model must be provided when using boundary_ifaces")
noflo_or_dry = np.logical_or(head == model.hnoflo, head == model.hdry)
for budget_term, iface_info in boundary_ifaces.items():
# look for budget term in budget file
matched_name = [s for s in rec_names if budget_term in s]
if not matched_name:
raise RuntimeError(
f'Budget term {budget_term} not found in "{cbcfile}" file.'
)
if len(matched_name) > 1:
raise RuntimeError(
"Budget term " + budget_term + " found"
" in several record names. Use a more "
" precise name."
)
Q_stress = cbf.get_data(
idx=idx,
kstpkper=kstpkper,
totim=totim,
text=matched_name[0],
full3D=True,
)[0]
# remove potential leading and trailing spaces
budget_term = budget_term.strip()
# weirdly, MODFLOW puts recharge in all potential recharge cells
# and not only the actual cells; thus, correct this by putting 0
# away from water table cells
if budget_term == "RECHARGE":
# find the water table as the first active cell in each column
water_table = np.full((nlay, nrow, ncol), False)
water_table[0, :, :] = np.logical_not(noflo_or_dry[0, :, :])
already_found = water_table[0, :, :]
for lay in range(1, nlay):
if np.sum(already_found) == nrow * ncol:
break
water_table[lay, :, :] = np.logical_and(
np.logical_not(noflo_or_dry[lay, :, :]),
np.logical_not(already_found),
)
already_found = np.logical_or(already_found, water_table[lay, :, :])
Q_stress[np.logical_not(water_table)] = 0.0
# case where the same iface is assigned to all cells
if isinstance(iface_info, int):
if iface_info == 1:
Qx_ext[:, :, :-1] += Q_stress
elif iface_info == 2:
Qx_ext[:, :, 1:] -= Q_stress
elif iface_info == 3:
Qy_ext[:, 1:, :] += Q_stress
elif iface_info == 4:
Qy_ext[:, :-1, :] -= Q_stress
elif iface_info == 5:
Qz_ext[1:, :, :] += Q_stress
elif iface_info == 6:
Qz_ext[:-1, :, :] -= Q_stress
# case where iface is assigned individually per cell
elif isinstance(iface_info, list):
# impose a unique iface (normally = 6) for some stresses
# (note: UZF RECHARGE, GW ET and SURFACE LEAKAGE are all
# related to the UZF package)
if (
budget_term == "RECHARGE"
or budget_term == "ET"
or budget_term == "UZF RECHARGE"
or budget_term == "GW ET"
or budget_term == "SURFACE LEAKAGE"
):
raise ValueError(
"This function imposes the use of a "
"unique iface (normally = 6) for the "
"{} budget term.".format(budget_term)
)
# loop through boundary cells
for cell_info in iface_info:
lay, row, col = cell_info[0], cell_info[1], cell_info[2]
if noflo_or_dry[lay, row, col]:
continue
iface = cell_info[-1]
# Here, where appropriate, we recalculate Q_stress_cell
# using package input. This gives more flexibility than
# directly taking the value saved by MODFLOW. Indeed, it
# allows for a same type of stress to be applied several
# times to the same cell but to different faces
# (whereas MODFLOW only saves one lumped value per
# stress type per cell).
# Note: this flexibility is not supported for:
# - FHB package (we would need to interpolate inputs
# across time steps as done in the package; complicated)
# - RES package (we would need to interpolate inputs
# across time steps as done in the package; complicated)
# - STR package (we would need first to retrieve river
# stage from model outputs; complicated)
# - SFR1 package (we would need to retrieve river
# stage and conductance from model outputs; complicated)
# - SFR2 package (even more complicated than SFR1)
# - LAK3 package (we would need to retrieve lake
# stage and conductance from model outputs; complicated)
# - MNW1 package (we would need to retrieve well head and
# conductance from model outputs; complicated)
# - MNW2 package (even more complicated than MNW1)
if budget_term == "WELLS":
Q_stress_cell = cell_info[3]
elif budget_term == "HEAD DEP BOUNDS":
ghb_head = cell_info[3]
ghb_cond = cell_info[4]
model_head = head[lay, row, col]
Q_stress_cell = ghb_cond * (ghb_head - model_head)
elif budget_term == "RIVER LEAKAGE":
riv_stage = cell_info[3]
riv_cond = cell_info[4]
riv_rbot = cell_info[5]
model_head = head[lay, row, col]
if model_head > riv_rbot:
Q_stress_cell = riv_cond * (riv_stage - model_head)
else:
Q_stress_cell = riv_cond * (riv_stage - riv_rbot)
elif budget_term == "DRAINS":
drn_stage = cell_info[3]
drn_cond = cell_info[4]
model_head = head[lay, row, col]
if model_head > drn_stage:
Q_stress_cell = drn_cond * (drn_stage - model_head)
else:
continue
# Else, take the value saved by MODFLOW.
# This includes the budget terms:
# - 'CONSTANT HEAD' for:
# * head specified through -1 in IBOUND
# * head specified in CHD package
# * head specified in FHB package
# - 'SPECIFIED FLOWS' for flow specified in FHB package
# - 'RESERV. LEAKAGE' for RES package
# - 'STREAM LEAKAGE' for:
# * STR package
# * SFR1 package
# * SFR2 package
# - 'LAKE SEEPAGE' for LAK3 package
# - 'MNW' for MNW1 package
# - 'MNW2' for MNW2 package
# - 'SWIADDTOCH' for SWI2 package
else:
Q_stress_cell = Q_stress[lay, row, col]
if iface == 1:
Qx_ext[lay, row, col] += Q_stress_cell
elif iface == 2:
Qx_ext[lay, row, col + 1] -= Q_stress_cell
elif iface == 3:
Qy_ext[lay, row + 1, col] += Q_stress_cell
elif iface == 4:
Qy_ext[lay, row, col] -= Q_stress_cell
elif iface == 5:
Qz_ext[lay + 1, row, col] += Q_stress_cell
elif iface == 6:
Qz_ext[lay, row, col] -= Q_stress_cell
else:
raise TypeError("boundary_ifaces value must be either int or list.")
return Qx_ext, Qy_ext, Qz_ext
[docs]def get_specific_discharge(
vectors,
model,
head=None,
position="centers",
):
"""
Get the discharge vector at cell centers at a given time. For "classical"
MODFLOW versions, we calculate it from the flow rate across cell faces.
For MODFLOW 6, we directly take it from MODFLOW output (this requires
setting the option "save_specific_discharge" in the NPF package).
Parameters
----------
vectors : tuple, np.recarray
either a tuple of (flow right face, flow front face, flow lower face)
numpy arrays from a MODFLOW-2005 compatible Cell Budget File
or
a specific discharge recarray from a MODFLOW 6 Cell Budget File
model : object
flopy model object
head : np.ndarray
numpy array of head values for a specific model
position : str
Position at which the specific discharge will be calculated. Possible
values are "centers" (default), "faces" and "vertices".
Returns
-------
(qx, qy, qz) : tuple
Discharge vector.
qx, qy, qz are ndarrays of size (nlay, nrow, ncol) for a structured
grid or size (nlay, ncpl) for an unstructured grid.
The sign of qy is such that the y axis is considered to increase
in the north direction.
The sign of qz is such that the z axis is considered to increase
in the upward direction.
Note: if a head array is provided, inactive and dry cells are
set to NaN.
"""
classical_budget = False
spdis, tqx, tqy, tqz = None, None, None, None
modelgrid = model.modelgrid
if head is not None:
head.shape = modelgrid.shape
if isinstance(vectors, (list, tuple)):
classical_budget = True
for ix, vector in enumerate(vectors):
if vector is None:
continue
else:
tshp = list(modelgrid.shape)[::-1]
tshp[ix] += 1
ext_shape = tuple(tshp[::-1])
break
if vectors[ix].shape == modelgrid.shape:
tqx = np.zeros(
(modelgrid.nlay, modelgrid.nrow, modelgrid.ncol + 1), dtype=np.float32
)
tqy = np.zeros(
(modelgrid.nlay, modelgrid.nrow + 1, modelgrid.ncol), dtype=np.float32
)
tqz = np.zeros(
(modelgrid.nlay + 1, modelgrid.nrow, modelgrid.ncol), dtype=np.float32
)
if vectors[0] is not None:
tqx[:, :, 1:] = vectors[0]
if vectors[1] is not None:
tqy[:, 1:, :] = -vectors[1]
if len(vectors) > 2 and vectors[2] is not None:
tqz[1:, :, :] = -vectors[2]
elif vectors[ix].shape == ext_shape:
if vectors[0] is not None:
tqx = vectors[0]
if vectors[1] is not None:
tqy = vectors[1]
if len(vectors) > 2 and vectors[2] is not None:
tqz = vectors[2]
else:
raise IndexError(
"Classical budget components must have the same shape as the modelgrid"
)
else:
spdis = vectors
if classical_budget:
# get saturated thickness (head - bottom elev for unconfined layer)
if head is None:
saturated_thickness = modelgrid.remove_confining_beds(
modelgrid.cell_thickness
)
else:
saturated_thickness = modelgrid.saturated_thickness(
head, mask=[model.hdry, model.hnoflo]
)
saturated_thickness.shape = modelgrid.shape
# inform modelgrid of no-flow and dry cells
modelgrid = model.modelgrid
if modelgrid._idomain is None:
modelgrid._idomain = model.dis.ibound
if head is not None:
noflo_or_dry = np.logical_or(head == model.hnoflo, head == model.hdry)
modelgrid._idomain[noflo_or_dry] = 0
# get cross section areas along x
delc = np.reshape(modelgrid.delc, (1, modelgrid.nrow, 1))
cross_area_x = delc * saturated_thickness
# get cross section areas along y
delr = np.reshape(modelgrid.delr, (1, 1, modelgrid.ncol))
cross_area_y = delr * saturated_thickness
# get cross section areas along z
cross_area_z = np.ones(modelgrid.shape) * delc * delr
# calculate qx, qy, qz
if position == "centers":
qx = np.zeros(modelgrid.shape, dtype=np.float32)
qy = np.zeros(modelgrid.shape, dtype=np.float32)
cross_area_x = (
delc[:]
* 0.5
* (saturated_thickness[:, :, :-1] + saturated_thickness[:, :, 1:])
)
cross_area_y = (
delr
* 0.5
* (saturated_thickness[:, 1:, :] + saturated_thickness[:, :-1, :])
)
qx[:, :, 1:] = 0.5 * (tqx[:, :, 2:] + tqx[:, :, 1:-1]) / cross_area_x
qx[:, :, 0] = 0.5 * tqx[:, :, 1] / cross_area_x[:, :, 0]
qy[:, 1:, :] = 0.5 * (tqy[:, 2:, :] + tqy[:, 1:-1, :]) / cross_area_y
qy[:, 0, :] = 0.5 * tqy[:, 1, :] / cross_area_y[:, 0, :]
qz = 0.5 * (tqz[1:, :, :] + tqz[:-1, :, :]) / cross_area_z
elif position == "faces" or position == "vertices":
cross_area_x = modelgrid.array_at_faces(cross_area_x, "x")
cross_area_y = modelgrid.array_at_faces(cross_area_y, "y")
cross_area_z = modelgrid.array_at_faces(cross_area_z, "z")
qx = tqx / cross_area_x
qy = tqy / cross_area_y
qz = tqz / cross_area_z
else:
raise ValueError(f'"{position}" is not a valid value for position')
if position == "vertices":
qx = modelgrid.array_at_verts(qx)
qy = modelgrid.array_at_verts(qy)
qz = modelgrid.array_at_verts(qz)
else:
if position != "centers":
raise ValueError(
f"MF6 vectors cannot be calculated at {position}, 'centers' "
f"is the only supported option"
)
nnodes = model.modelgrid.nnodes
qx = np.full((nnodes), np.nan, dtype=np.float64)
qy = np.full((nnodes), np.nan, dtype=np.float64)
qz = np.full((nnodes), np.nan, dtype=np.float64)
idx = np.array(spdis["node"]) - 1
qx[idx] = spdis["qx"]
qy[idx] = spdis["qy"]
qz[idx] = spdis["qz"]
qx.shape = modelgrid.shape
qy.shape = modelgrid.shape
qz.shape = modelgrid.shape
# set no-flow and dry cells to NaN
if head is not None and position == "centers":
noflo_or_dry = np.logical_or(head == model.hnoflo, head == model.hdry)
qx[noflo_or_dry] = np.nan
qy[noflo_or_dry] = np.nan
qz[noflo_or_dry] = np.nan
return qx, qy, qz