Source code for flopy.utils.postprocessing

import numpy as np


[docs]def get_transmissivities( heads, m, r=None, c=None, x=None, y=None, sctop=None, scbot=None, nodata=-999, ): """ Computes transmissivity in each model layer at specified locations and open intervals. A saturated thickness is determined for each row, column or x, y location supplied, based on the open interval (sctop, scbot), if supplied, otherwise the layer tops and bottoms and the water table are used. Parameters ---------- heads : 2D array OR 3D array numpy array of shape nlay by n locations (2D) OR complete heads array of the model for one time (3D) m : flopy.modflow.Modflow object Must have dis and lpf or upw packages. r : 1D array-like of ints, of length n locations row indices (optional; alternately specify x, y) c : 1D array-like of ints, of length n locations column indices (optional; alternately specify x, y) x : 1D array-like of floats, of length n locations x locations in real world coordinates (optional) y : 1D array-like of floats, of length n locations y locations in real world coordinates (optional) sctop : 1D array-like of floats, of length n locations open interval tops (optional; default is model top) scbot : 1D array-like of floats, of length n locations open interval bottoms (optional; default is model bottom) nodata : numeric optional; locations where heads=nodata will be assigned T=0 Returns ------- T : 2D array of same shape as heads (nlay x n locations) Transmissivities in each layer at each location """ if r is not None and c is not None: pass elif x is not None and y is not None: # get row, col for observation locations r, c = m.modelgrid.intersect(x, y) else: raise ValueError("Must specify row, column or x, y locations.") # get k-values and botms at those locations paklist = m.get_package_list() if "LPF" in paklist: hk = m.lpf.hk.array[:, r, c] elif "UPW" in paklist: hk = m.upw.hk.array[:, r, c] else: raise ValueError("No LPF or UPW package.") botm = m.dis.botm.array[:, r, c] if heads.shape == (m.nlay, m.nrow, m.ncol): heads = heads[:, r, c] msg = "Shape of heads array must be nlay x nhyd" assert heads.shape == botm.shape, msg # set open interval tops/bottoms to model top/bottom if None if sctop is None: sctop = m.dis.top.array[r, c] if scbot is None: scbot = m.dis.botm.array[-1, r, c] # make an array of layer tops tops = np.empty_like(botm, dtype=float) tops[0, :] = m.dis.top.array[r, c] tops[1:, :] = botm[:-1] # expand top and bottom arrays to be same shape as botm, thickness, etc. # (so we have an open interval value for each layer) sctoparr = np.zeros(botm.shape) sctoparr[:] = sctop scbotarr = np.zeros(botm.shape) scbotarr[:] = scbot # start with layer tops # set tops above heads to heads # set tops above screen top to screen top # (we only care about the saturated open interval) openinvtop = tops.copy() openinvtop[openinvtop > heads] = heads[openinvtop > heads] openinvtop[openinvtop > sctoparr] = sctoparr[openinvtop > sctop] # start with layer bottoms # set bottoms below screened interval to screened interval bottom # set screen bottoms below bottoms to layer bottoms openinvbotm = botm.copy() openinvbotm[openinvbotm < scbotarr] = scbotarr[openinvbotm < scbot] openinvbotm[scbotarr < botm] = botm[scbotarr < botm] # compute thickness of open interval in each layer thick = openinvtop - openinvbotm # assign open intervals above or below model to closest cell in column not_in_layer = np.sum(thick < 0, axis=0) not_in_any_layer = not_in_layer == thick.shape[0] for i, n in enumerate(not_in_any_layer): if n: closest = np.argmax(thick[:, i]) thick[closest, i] = 1.0 thick[thick < 0] = 0 thick[heads == nodata] = 0 # exclude nodata cells # compute transmissivities T = thick * hk return T
[docs]def get_water_table(heads, hdry=-1e30, hnoflo=1e30, masked_values=None): """ Get a 2D array representing the water table elevation for each stress period in heads array. Parameters ---------- heads : 3 or 4-D np.ndarray Heads array. hdry : real The head that is assigned to cells that are converted to dry during a simulation. By default, -1e30. hnoflo : real The value of head assigned to all inactive (no flow) cells throughout the simulation, including vertical pass-through cells in MODFLOW 6. By default, 1e30. masked_values : list List of any values (in addition to hdry and hnoflo) that should be masked in the water table calculation. Returns ------- wt : 2 or 3-D np.ndarray of water table elevations for each stress period. """ heads = np.array(heads, ndmin=4) mask = (heads == hdry) | (heads == hnoflo) if masked_values is not None: for val in masked_values: mask = mask | (heads == val) k = (~mask).argmax(axis=1) per, i, j = np.indices(k.shape) wt = heads[per.ravel(), k.ravel(), i.ravel(), j.ravel()].reshape(k.shape) wt = np.squeeze(wt) mask = (wt == hdry) | (wt == hnoflo) if masked_values is not None: for val in masked_values: mask = mask | (wt == val) wt = np.ma.masked_array(wt, mask) return wt
[docs]def get_gradients(heads, m, nodata, per_idx=None): """ Calculates the hydraulic gradients from the heads array for each stress period. Parameters ---------- heads : 3 or 4-D np.ndarray Heads array. m : flopy.modflow.Modflow object Must have a flopy.modflow.ModflowDis object attached. nodata : real HDRY value indicating dry cells. per_idx : int or sequence of ints stress periods to return. If None, returns all stress periods (default). Returns ------- grad : 3 or 4-D np.ndarray Array of hydraulic gradients """ # internal calculations done on a masked array heads = np.ma.array(heads, ndmin=4, mask=heads == nodata) nper, nlay, nrow, ncol = heads.shape if per_idx is None: per_idx = list(range(nper)) elif np.isscalar(per_idx): per_idx = [per_idx] grad = [] for per in per_idx: hds = heads[per] zcnt_per = np.ma.array(m.dis.zcentroids, mask=hds.mask) unsat = zcnt_per > hds zcnt_per[unsat] = hds[unsat] # apply .diff on data and mask components separately diff_mask = np.diff(hds.mask, axis=0) dz = np.ma.array(np.diff(zcnt_per.data, axis=0), mask=diff_mask) dh = np.ma.array(np.diff(hds.data, axis=0), mask=diff_mask) # convert to nan-filled array, as is expected(!?) grad.append((dh / dz).filled(np.nan)) return np.squeeze(grad)
[docs]def get_extended_budget( cbcfile, precision="single", idx=None, kstpkper=None, totim=None, boundary_ifaces=None, hdsfile=None, model=None, ): """ Get the flow rate across cell faces including potential stresses applied along boundaries at a given time. Only implemented for "classical" MODFLOW versions where the budget is recorded as FLOW RIGHT FACE, FLOW FRONT FACE and FLOW LOWER FACE arrays. Parameters ---------- cbcfile : str Cell by cell file produced by Modflow. precision : str Binary file precision, default is 'single'. idx : int or list The zero-based record number. kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). The kstp and kper values are zero based. totim : float The simulation time. boundary_ifaces : dictionary {str: int or list} A dictionary defining how to treat stress flows at boundary cells. The keys are budget terms corresponding to stress packages (same term as in the overall volumetric budget printed in the listing file). The values are either a single iface number to be applied to all cells for the stress package, or a list of lists describing individual boundary cells in the same way as in the package input plus the iface number appended. The iface number indicates the face to which the stress flow is assigned, following the MODPATH convention (see MODPATH user guide). Example: boundary_ifaces = { 'RECHARGE': 6, 'RIVER LEAKAGE': 6, 'CONSTANT HEAD': [[lay, row, col, iface], ...], 'WELLS': [[lay, row, col, flux, iface], ...], 'HEAD DEP BOUNDS': [[lay, row, col, head, cond, iface], ...]}. Note: stresses that are not informed in boundary_ifaces are implicitly treated as internally-distributed sinks/sources. hdsfile : str Head file produced by MODFLOW (only required if boundary_ifaces is used). model : flopy.modflow.Modflow object Modflow model instance (only required if boundary_ifaces is used). Returns ------- (Qx_ext, Qy_ext, Qz_ext) : tuple Flow rates across cell faces. Qx_ext is a ndarray of size (nlay, nrow, ncol + 1). Qy_ext is a ndarray of size (nlay, nrow + 1, ncol). The sign is such that the y axis is considered to increase in the north direction. Qz_ext is a ndarray of size (nlay + 1, nrow, ncol). The sign is such that the z axis is considered to increase in the upward direction. """ import flopy.utils.binaryfile as bf import flopy.utils.formattedfile as fm # define useful stuff if isinstance(cbcfile, bf.CellBudgetFile): cbf = cbcfile else: cbf = bf.CellBudgetFile(cbcfile) nlay, nrow, ncol = cbf.nlay, cbf.nrow, cbf.ncol rec_names = cbf.get_unique_record_names(decode=True) err_msg = " not found in the budget file." # get flow across right face Qx_ext = np.zeros((nlay, nrow, ncol + 1), dtype=np.float32) if ncol > 1: budget_term = "FLOW RIGHT FACE" matched_name = [s for s in rec_names if budget_term in s] if not matched_name: raise RuntimeError(budget_term + err_msg) frf = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term ) Qx_ext[:, :, 1:] = frf[0] # SWI2 package budget_term_swi = "SWIADDTOFRF" matched_name_swi = [s for s in rec_names if budget_term_swi in s] if matched_name_swi: frf_swi = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term_swi ) Qx_ext[:, :, 1:] += frf_swi[0] # get flow across front face Qy_ext = np.zeros((nlay, nrow + 1, ncol), dtype=np.float32) if nrow > 1: budget_term = "FLOW FRONT FACE" matched_name = [s for s in rec_names if budget_term in s] if not matched_name: raise RuntimeError(budget_term + err_msg) fff = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term ) Qy_ext[:, 1:, :] = -fff[0] # SWI2 package budget_term_swi = "SWIADDTOFFF" matched_name_swi = [s for s in rec_names if budget_term_swi in s] if matched_name_swi: fff_swi = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term_swi ) Qy_ext[:, 1:, :] -= fff_swi[0] # get flow across lower face Qz_ext = np.zeros((nlay + 1, nrow, ncol), dtype=np.float32) if nlay > 1: budget_term = "FLOW LOWER FACE" matched_name = [s for s in rec_names if budget_term in s] if not matched_name: raise RuntimeError(budget_term + err_msg) flf = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term ) Qz_ext[1:, :, :] = -flf[0] # SWI2 package budget_term_swi = "SWIADDTOFLF" matched_name_swi = [s for s in rec_names if budget_term_swi in s] if matched_name_swi: flf_swi = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=budget_term_swi ) Qz_ext[1:, :, :] -= flf_swi[0] # deal with boundary cells if boundary_ifaces is not None: # need calculated heads for some stresses and to check hnoflo and hdry if hdsfile is None: raise ValueError( "hdsfile must be provided when using boundary_ifaces" ) if isinstance(hdsfile, (bf.HeadFile, fm.FormattedHeadFile)): hds = hdsfile else: try: hds = bf.HeadFile(hdsfile) except: hds = fm.FormattedHeadFile(hdsfile, precision=precision) head = hds.get_data(idx=idx, kstpkper=kstpkper, totim=totim) # get hnoflo and hdry values if model is None: raise ValueError( "model must be provided when using boundary_ifaces" ) noflo_or_dry = np.logical_or(head == model.hnoflo, head == model.hdry) for budget_term, iface_info in boundary_ifaces.items(): # look for budget term in budget file matched_name = [s for s in rec_names if budget_term in s] if not matched_name: raise RuntimeError( f'Budget term {budget_term} not found in "{cbcfile}" file.' ) if len(matched_name) > 1: raise RuntimeError( "Budget term " + budget_term + " found" " in several record names. Use a more " " precise name." ) Q_stress = cbf.get_data( idx=idx, kstpkper=kstpkper, totim=totim, text=matched_name[0], full3D=True, )[0] # remove potential leading and trailing spaces budget_term = budget_term.strip() # weirdly, MODFLOW puts recharge in all potential recharge cells # and not only the actual cells; thus, correct this by putting 0 # away from water table cells if budget_term == "RECHARGE": # find the water table as the first active cell in each column water_table = np.full((nlay, nrow, ncol), False) water_table[0, :, :] = np.logical_not(noflo_or_dry[0, :, :]) already_found = water_table[0, :, :] for lay in range(1, nlay): if np.sum(already_found) == nrow * ncol: break water_table[lay, :, :] = np.logical_and( np.logical_not(noflo_or_dry[lay, :, :]), np.logical_not(already_found), ) already_found = np.logical_or( already_found, water_table[lay, :, :] ) Q_stress[np.logical_not(water_table)] = 0.0 # case where the same iface is assigned to all cells if isinstance(iface_info, int): if iface_info == 1: Qx_ext[:, :, :-1] += Q_stress elif iface_info == 2: Qx_ext[:, :, 1:] -= Q_stress elif iface_info == 3: Qy_ext[:, 1:, :] += Q_stress elif iface_info == 4: Qy_ext[:, :-1, :] -= Q_stress elif iface_info == 5: Qz_ext[1:, :, :] += Q_stress elif iface_info == 6: Qz_ext[:-1, :, :] -= Q_stress # case where iface is assigned individually per cell elif isinstance(iface_info, list): # impose a unique iface (normally = 6) for some stresses # (note: UZF RECHARGE, GW ET and SURFACE LEAKAGE are all # related to the UZF package) if ( budget_term == "RECHARGE" or budget_term == "ET" or budget_term == "UZF RECHARGE" or budget_term == "GW ET" or budget_term == "SURFACE LEAKAGE" ): raise ValueError( "This function imposes the use of a " "unique iface (normally = 6) for the " "{} budget term.".format(budget_term) ) # loop through boundary cells for cell_info in iface_info: lay, row, col = cell_info[0], cell_info[1], cell_info[2] if noflo_or_dry[lay, row, col]: continue iface = cell_info[-1] # Here, where appropriate, we recalculate Q_stress_cell # using package input. This gives more flexibility than # directly taking the value saved by MODFLOW. Indeed, it # allows for a same type of stress to be applied several # times to the same cell but to different faces # (whereas MODFLOW only saves one lumped value per # stress type per cell). # Note: this flexibility is not supported for: # - FHB package (we would need to interpolate inputs # across time steps as done in the package; complicated) # - RES package (we would need to interpolate inputs # across time steps as done in the package; complicated) # - STR package (we would need first to retrieve river # stage from model outputs; complicated) # - SFR1 package (we would need to retrieve river # stage and conductance from model outputs; complicated) # - SFR2 package (even more complicated than SFR1) # - LAK3 package (we would need to retrieve lake # stage and conductance from model outputs; complicated) # - MNW1 package (we would need to retrieve well head and # conductance from model outputs; complicated) # - MNW2 package (even more complicated than MNW1) if budget_term == "WELLS": Q_stress_cell = cell_info[3] elif budget_term == "HEAD DEP BOUNDS": ghb_head = cell_info[3] ghb_cond = cell_info[4] model_head = head[lay, row, col] Q_stress_cell = ghb_cond * (ghb_head - model_head) elif budget_term == "RIVER LEAKAGE": riv_stage = cell_info[3] riv_cond = cell_info[4] riv_rbot = cell_info[5] model_head = head[lay, row, col] if model_head > riv_rbot: Q_stress_cell = riv_cond * (riv_stage - model_head) else: Q_stress_cell = riv_cond * (riv_stage - riv_rbot) elif budget_term == "DRAINS": drn_stage = cell_info[3] drn_cond = cell_info[4] model_head = head[lay, row, col] if model_head > drn_stage: Q_stress_cell = drn_cond * (drn_stage - model_head) else: continue # Else, take the value saved by MODFLOW. # This includes the budget terms: # - 'CONSTANT HEAD' for: # * head specified through -1 in IBOUND # * head specified in CHD package # * head specified in FHB package # - 'SPECIFIED FLOWS' for flow specified in FHB package # - 'RESERV. LEAKAGE' for RES package # - 'STREAM LEAKAGE' for: # * STR package # * SFR1 package # * SFR2 package # - 'LAKE SEEPAGE' for LAK3 package # - 'MNW' for MNW1 package # - 'MNW2' for MNW2 package # - 'SWIADDTOCH' for SWI2 package else: Q_stress_cell = Q_stress[lay, row, col] if iface == 1: Qx_ext[lay, row, col] += Q_stress_cell elif iface == 2: Qx_ext[lay, row, col + 1] -= Q_stress_cell elif iface == 3: Qy_ext[lay, row + 1, col] += Q_stress_cell elif iface == 4: Qy_ext[lay, row, col] -= Q_stress_cell elif iface == 5: Qz_ext[lay + 1, row, col] += Q_stress_cell elif iface == 6: Qz_ext[lay, row, col] -= Q_stress_cell else: raise TypeError( "boundary_ifaces value must be either int or list." ) return Qx_ext, Qy_ext, Qz_ext
[docs]def get_specific_discharge( vectors, model, head=None, position="centers", ): """ Get the discharge vector at cell centers at a given time. For "classical" MODFLOW versions, we calculate it from the flow rate across cell faces. For MODFLOW 6, we directly take it from MODFLOW output (this requires setting the option "save_specific_discharge" in the NPF package). Parameters ---------- vectors : tuple, np.recarray either a tuple of (flow right face, flow front face, flow lower face) numpy arrays from a MODFLOW-2005 compatible Cell Budget File or a specific discharge recarray from a MODFLOW 6 Cell Budget File model : object flopy model object head : np.ndarray numpy array of head values for a specific model position : str Position at which the specific discharge will be calculated. Possible values are "centers" (default), "faces" and "vertices". Returns ------- (qx, qy, qz) : tuple Discharge vector. qx, qy, qz are ndarrays of size (nlay, nrow, ncol) for a structured grid or size (nlay, ncpl) for an unstructured grid. The sign of qy is such that the y axis is considered to increase in the north direction. The sign of qz is such that the z axis is considered to increase in the upward direction. Note: if a head array is provided, inactive and dry cells are set to NaN. """ classical_budget = False spdis, tqx, tqy, tqz = None, None, None, None modelgrid = model.modelgrid if head is not None: head.shape = modelgrid.shape if isinstance(vectors, (list, tuple)): classical_budget = True for ix, vector in enumerate(vectors): if vector is None: continue else: tshp = list(modelgrid.shape)[::-1] tshp[ix] += 1 ext_shape = tuple(tshp[::-1]) break if vectors[ix].shape == modelgrid.shape: tqx = np.zeros( (modelgrid.nlay, modelgrid.nrow, modelgrid.ncol + 1), dtype=np.float32, ) tqy = np.zeros( (modelgrid.nlay, modelgrid.nrow + 1, modelgrid.ncol), dtype=np.float32, ) tqz = np.zeros( (modelgrid.nlay + 1, modelgrid.nrow, modelgrid.ncol), dtype=np.float32, ) if vectors[0] is not None: tqx[:, :, 1:] = vectors[0] if vectors[1] is not None: tqy[:, 1:, :] = -vectors[1] if len(vectors) > 2 and vectors[2] is not None: tqz[1:, :, :] = -vectors[2] elif vectors[ix].shape == ext_shape: if vectors[0] is not None: tqx = vectors[0] if vectors[1] is not None: tqy = vectors[1] if len(vectors) > 2 and vectors[2] is not None: tqz = vectors[2] else: raise IndexError( "Classical budget components must have " "the same shape as the modelgrid" ) else: spdis = vectors if classical_budget: # get saturated thickness (head - bottom elev for unconfined layer) if head is None: saturated_thickness = modelgrid.remove_confining_beds( modelgrid.cell_thickness ) else: saturated_thickness = modelgrid.saturated_thickness( head, mask=[model.hdry, model.hnoflo] ) saturated_thickness.shape = modelgrid.shape # inform modelgrid of no-flow and dry cells modelgrid = model.modelgrid if modelgrid._idomain is None: modelgrid._idomain = model.dis.ibound if head is not None: noflo_or_dry = np.logical_or( head == model.hnoflo, head == model.hdry ) modelgrid._idomain[noflo_or_dry] = 0 # get cross section areas along x delc = np.reshape(modelgrid.delc, (1, modelgrid.nrow, 1)) cross_area_x = delc * saturated_thickness # get cross section areas along y delr = np.reshape(modelgrid.delr, (1, 1, modelgrid.ncol)) cross_area_y = delr * saturated_thickness # get cross section areas along z cross_area_z = np.ones(modelgrid.shape) * delc * delr # calculate qx, qy, qz if position == "centers": qx = np.zeros(modelgrid.shape, dtype=np.float32) qy = np.zeros(modelgrid.shape, dtype=np.float32) cross_area_x = ( delc[:] * 0.5 * ( saturated_thickness[:, :, :-1] + saturated_thickness[:, :, 1:] ) ) cross_area_y = ( delr * 0.5 * ( saturated_thickness[:, 1:, :] + saturated_thickness[:, :-1, :] ) ) qx[:, :, 1:] = ( 0.5 * (tqx[:, :, 2:] + tqx[:, :, 1:-1]) / cross_area_x ) qx[:, :, 0] = 0.5 * tqx[:, :, 1] / cross_area_x[:, :, 0] qy[:, 1:, :] = ( 0.5 * (tqy[:, 2:, :] + tqy[:, 1:-1, :]) / cross_area_y ) qy[:, 0, :] = 0.5 * tqy[:, 1, :] / cross_area_y[:, 0, :] qz = 0.5 * (tqz[1:, :, :] + tqz[:-1, :, :]) / cross_area_z elif position == "faces" or position == "vertices": cross_area_x = modelgrid.array_at_faces(cross_area_x, "x") cross_area_y = modelgrid.array_at_faces(cross_area_y, "y") cross_area_z = modelgrid.array_at_faces(cross_area_z, "z") qx = tqx / cross_area_x qy = tqy / cross_area_y qz = tqz / cross_area_z else: raise ValueError(f'"{position}" is not a valid value for position') if position == "vertices": qx = modelgrid.array_at_verts(qx) qy = modelgrid.array_at_verts(qy) qz = modelgrid.array_at_verts(qz) else: if position != "centers": raise ValueError( f"MF6 vectors cannot be calculated at {position}, 'centers' " f"is the only supported option" ) nnodes = model.modelgrid.nnodes qx = np.full((nnodes), np.nan, dtype=np.float64) qy = np.full((nnodes), np.nan, dtype=np.float64) qz = np.full((nnodes), np.nan, dtype=np.float64) idx = np.array(spdis["node"]) - 1 qx[idx] = spdis["qx"] qy[idx] = spdis["qy"] qz[idx] = spdis["qz"] qx.shape = modelgrid.shape qy.shape = modelgrid.shape qz.shape = modelgrid.shape # set no-flow and dry cells to NaN if head is not None and position == "centers": noflo_or_dry = np.logical_or(head == model.hnoflo, head == model.hdry) qx[noflo_or_dry] = np.nan qy[noflo_or_dry] = np.nan qz[noflo_or_dry] = np.nan return qx, qy, qz