Source code for flopy.utils.check

import os

import numpy as np
from numpy.lib import recfunctions

from ..utils.recarray_utils import recarray
from ..utils.util_array import Util3d

[docs]class check: """ Check package for common errors Parameters ---------- package : object Instance of Package class. verbose : bool Boolean flag used to determine if check method results are written to the screen level : int Check method analysis level. If level=0, summary checks are performed. If level=1, full checks are performed. property_threshold_values : dict hk : tuple Reasonable minimum/maximum hydraulic conductivity value; values below this will be flagged. Default is (1e-11, 1e5), after Bear, 1972 (see and Schwartz and Zhang (2003, Table 4.4). vka : tuple Reasonable minimum/maximum hydraulic conductivity value; Default is (1e-11, 1e5), after Bear, 1972 (see and Schwartz and Zhang (2003, Table 4.4). vkcb : tuple Reasonable minimum/maximum hydraulic conductivity value for quasi-3D confining bed; Default is (1e-11, 1e5), after Bear, 1972 (see and Schwartz and Zhang (2003, Table 4.4). sy : tuple Reasonable minimum/maximum specific yield values; Default is (0.01,0.5) after Anderson, Woessner and Hunt (2015, Table 5.2). sy : tuple Reasonable minimum/maximum specific storage values; Default is (3.3e-6, 2e-2) after Anderson, Woessner and Hunt (2015, Table 5.2). Notes ----- Anderson, M.P, Woessner, W.W. and Hunt, R.J., 2015. Applied Groundwater Modeling: Simulation of Flow and Advective Transport, Elsevier, 564p. Bear, J., 1972. Dynamics of Fluids in Porous Media. Dover Publications. Schwartz, F.W. and Zhang, H., 2003. Fundamentals of Groundwater, Wiley, 583 p. """ bc_stage_names = {"GHB": "bhead", "DRN": "elev"} # all names in lower case # only check packages when level is >= to these values # default is 0 (always check package) package_check_levels = {"sfr": 1} property_threshold_values = { "hk": (1e-11, 1e5), "k": (1e-11, 1e5), "k22": (1e-11, 1e5), # after Schwartz and Zhang, table 4.4 "hani": None, "vka": (1e-11, 1e5), "k33": (1e-11, 1e5), "vkcb": (1e-11, 1e5), "ss": (1e-6, 1e-2), "sy": (0.01, 0.5), } # which versions is pks compatible with? solver_packages = { "mf2k": ["DE4", "SIP", "SOR", "GMG", "PCG", "PCGN"], "mf2005": ["DE4", "SIP", "GMG", "PCG", "PCGN"], "mfnwt": ["DE4", "SIP", "PCG", "NWT"], "mfusg": ["SMS"], } # cells thickness less than this value will be flagged thin_cell_threshold = 1.0 def __init__( self, package, f=None, verbose=True, level=1, property_threshold_values={}, ): # allow for instantiation with model or package # if isinstance(package, BaseModel): didn't work if hasattr(package, "parent"): self.model = package.parent self.prefix = f"{[0]} PACKAGE DATA VALIDATION" else: self.model = package self.prefix = f"{} MODEL DATA VALIDATION SUMMARY" self.package = package if "structured" in self.model.__dict__: self.structured = self.model.structured else: self.structured = self.model.modelgrid.grid_type == "structured" self.verbose = verbose self.level = level self.passed = [] self.property_threshold_values.update(property_threshold_values) self.summary_array = self._get_summary_array() self.f = None if f is not None: if isinstance(f, str): if os.path.split(f)[0] == "": self.summaryfile = os.path.join(self.model.model_ws, f) else: # if a path is supplied with summary file, save there self.summaryfile = f self.f = open(self.summaryfile, "w") else: self.f = f self.txt = f"\n{self.prefix}:\n" def _add_to_summary( self, type="Warning", k=0, i=0, j=0, node=0, value=0, desc="", package=None, ): if package is None: package =[0] col_list = [type, package] col_list += [k, i, j] if self.structured else [node] col_list += [value, desc] sa = self._get_summary_array(np.array(col_list)) self.summary_array = np.append(self.summary_array, sa).view( np.recarray ) def _boolean_compare( self, array, col1, col2, level0txt="{} violations encountered.", level1txt="Violations:", sort_ascending=True, print_delimiter=" ", ): """ Compare two columns in a record array. For each row, tests if value in col1 is greater than col2. If any values in col1 are > col2, subsets array to only include rows where col1 is greater. Creates another column with differences (col1-col2), and prints the array sorted by the differences column (diff). Parameters ---------- array : record array Array with columns to compare. col1 : string Column name in array. col2 : string Column name in array. sort_ascending : T/F; default True If True, printed array will be sorted by differences in ascending order. print_delimiter : str Delimiter for printed array. Returns ------- txt : str Error messages and printed array (if .level attribute of checker is set to 1). Returns an empty string if no values in col1 are greater than col2. Notes ----- info about appending to record arrays (views vs. copies and upcoming changes to numpy): """ txt = "" array = array.copy() if isinstance(col1, np.ndarray): array = recfunctions.append_fields( array, names="tmp1", data=col1, asrecarray=True ) col1 = "tmp1" if isinstance(col2, np.ndarray): array = recfunctions.append_fields( array, names="tmp2", data=col2, asrecarray=True ) col2 = "tmp2" if isinstance(col1, tuple): array = recfunctions.append_fields( array, names=col1[0], data=col1[1], asrecarray=True ) col1 = col1[0] if isinstance(col2, tuple): array = recfunctions.append_fields( array, names=col2[0], data=col2[1], asrecarray=True ) col2 = col2[0] failed = array[col1] > array[col2] if np.any(failed): failed_info = array[failed].copy() txt += level0txt.format(len(failed_info)) + "\n" if self.level == 1: diff = failed_info[col2] - failed_info[col1] cols = [ c for c in failed_info.dtype.names if failed_info[c].sum() != 0 and c != "diff" and "tmp" not in c ] # currently failed_info[cols] results in a warning. Not sure # how to do this properly with a recarray. failed_info = recfunctions.append_fields( failed_info[cols].copy(), names="diff", data=diff, asrecarray=True, ) failed_info.sort(order="diff", axis=0) if not sort_ascending: failed_info = failed_info[::-1] txt += level1txt + "\n" txt += _print_rec_array(failed_info, delimiter=print_delimiter) txt += "\n" return txt def _get_summary_array(self, array=None): dtype = self._get_dtype() if array is None: return np.recarray((0), dtype=dtype) ra = recarray(array, dtype) # at = array.transpose() # a = np.core.records.fromarrays(at, dtype=dtype) return ra def _txt_footer( self, headertxt, txt, testname, passed=False, warning=True ): """ if len(txt) == 0 or passed: txt += 'passed.' self.passed.append(testname) elif warning: self.warnings.append(testname) else: self.errors.append(testname) if self.verbose: print(txt + '\n') self.txt += headertxt + txt + '\n' """ def _stress_period_data_valid_indices(self, stress_period_data): """Check that stress period data inds are valid for model grid.""" spd_inds_valid = self._has_cell_indices(stress_period_data) # check for BCs indices that are invalid for grid inds = self._get_cell_inds(stress_period_data) isvalid = self.isvalid(inds) if not np.all(isvalid): sa = self._list_spd_check_violations( stress_period_data, ~isvalid, error_name="invalid BC index", error_type="Error", ) self.summary_array = np.append(self.summary_array, sa).view( np.recarray ) spd_inds_valid = False self.remove_passed("BC indices valid") if spd_inds_valid: self.append_passed("BC indices valid") return spd_inds_valid def _stress_period_data_nans(self, stress_period_data, nan_excl_list): """Check for and list any nans in stress period data.""" isnan = np.array( [ np.isnan(stress_period_data[c]) for c in stress_period_data.dtype.names if not (stress_period_data.dtype[c].name == "object") and c not in nan_excl_list ] ).transpose() if np.any(isnan): row_has_nan = np.any(isnan, axis=1) sa = self._list_spd_check_violations( stress_period_data, row_has_nan, error_name="Not a number", error_type="Error", ) self.summary_array = np.append(self.summary_array, sa).view( np.recarray ) self.remove_passed("not a number (Nan) entries") else: self.append_passed("not a number (Nan) entries") def _stress_period_data_inactivecells(self, stress_period_data): """Check for and list any stress period data in cells with ibound=0.""" spd = stress_period_data inds = self._get_cell_inds(spd) msg = "BC in inactive cell" idomain = self.model.modelgrid.idomain if idomain is not None: ibnd = idomain[inds] if np.any(ibnd == 0): sa = self._list_spd_check_violations( stress_period_data, ibnd == 0, error_name=msg, error_type="Warning", ) self.summary_array = np.append(self.summary_array, sa).view( np.recarray ) self.remove_passed(f"{msg}s") else: self.append_passed(f"{msg}s") def _list_spd_check_violations( self, stress_period_data, criteria, col=None, error_name="", error_type="Warning", ): """ If criteria contains any true values, return the error_type, package name, k,i,j indices, values, and description of error for each row in stress_period_data where criteria=True. """ inds_col = self._get_cell_inds_names() # inds = stress_period_data[criteria][inds_col]\ # .reshape(stress_period_data[criteria].shape + (-1,)) # inds = np.atleast_2d(np.squeeze(inds.tolist())) inds = stress_period_data[criteria] a = self._get_cellid_cols(inds, inds_col) inds = a.view(int) inds = inds.reshape(stress_period_data[criteria].shape + (-1,)) if col is not None: v = stress_period_data[criteria][col] else: v = np.zeros(len(stress_period_data[criteria])) pn = [] * len(v) en = [error_name] * len(v) tp = [error_type] * len(v) return self._get_summary_array(np.column_stack([tp, pn, inds, v, en])) @staticmethod def _get_cellid_cols(inds, inds_col): a = inds[inds_col[0]] if len(inds_col) > 1: for n in inds_col[1:]: a = np.concatenate((a, inds[n])) return a
[docs] def append_passed(self, message): """Add a check to the passed list if it isn't already in there.""" self.passed.append(message) if message not in self.passed else None
[docs] def remove_passed(self, message): """Remove a check to the passed list if it failed in any stress period.""" self.passed.remove(message) if message in self.passed else None
[docs] def isvalid(self, inds): """Check that indices are valid for model grid Parameters ---------- inds : tuple or lists or arrays; or a 1-D array (k, i, j) for structured grids; (node) for unstructured. Returns ------- isvalid : 1-D boolean array True for each index in inds that is valid for the model grid. """ if isinstance(inds, np.ndarray): inds = [inds] mg = self.model.modelgrid if mg.grid_type == "structured" and len(inds) == 3: k = inds[0] < mg.nlay i = inds[1] < mg.nrow j = inds[2] < mg.ncol return k & i & j elif mg.grid_type == "vertex" and len(inds) == 2: lay = inds[0] < mg.nlay cpl = inds[1] < mg.ncpl return lay & cpl elif mg.grid_type == "unstructured" and len(inds) == 1: return inds[0] < mg.nnodes else: return np.zeros(inds[0].shape, dtype=bool)
[docs] def get_active(self, include_cbd=False): """Returns a boolean array of active cells for the model. Parameters ---------- include_cbd : boolean If True, active is of same dimension as the thickness array in the DIS module (includes quasi 3-D confining beds). Default False. Returns ------- active : 3-D boolean array True where active. """ mg = self.model.modelgrid if mg.grid_type == "structured": nlaycbd = mg.laycbd.sum() if include_cbd else 0 inds = (mg.nlay + nlaycbd, mg.nrow, mg.ncol) elif mg.grid_type == "vertex": inds = (mg.nlay, mg.ncpl) else: inds = mg.nnodes include_cbd = False if "BAS6" in self.model.get_package_list(): if "DIS" in self.model.get_package_list(): dis = self.model.dis else: dis = self.model.disu # make ibound of same shape as thicknesses/botm for quasi-3D models active = self.model.bas6.ibound.array != 0 if include_cbd and dis.laycbd.sum() > 0: laycbd = np.flatnonzero( dis.laycbd.array > 0 ) # cbd layer index active = np.insert(active, laycbd, active[laycbd], axis=0) else: # if bas package is missing active = np.ones(inds, dtype=bool) return active
[docs] def print_summary(self, cols=None, delimiter=",", float_format="{:.6f}"): # strip description column sa = self.summary_array.copy() desc = self.summary_array.desc sa["desc"] = [s.strip() for s in desc] return _print_rec_array( sa, cols=cols, delimiter=delimiter, float_format=float_format )
[docs] def stress_period_data_values( self, stress_period_data, criteria, col=None, error_name="", error_type="Warning", ): """ If criteria contains any true values, return the error_type, package name, k,i,j indices, values, and description of error for each row in stress_period_data where criteria=True. """ # check for valid cell indices # self._stress_period_data_valid_indices(stress_period_data) # first check for and list nan values # self._stress_period_data_nans(stress_period_data) # next check for BCs in inactive cells # self._stress_period_data_inactivecells(stress_period_data) if np.any(criteria): # list the values that met the criteria sa = self._list_spd_check_violations( stress_period_data, criteria, col, error_name=error_name, error_type=error_type, ) self.summary_array = np.append(self.summary_array, sa).view( np.recarray ) self.remove_passed(error_name) else: self.append_passed(error_name)
[docs] def values(self, a, criteria, error_name="", error_type="Warning"): """ If criteria contains any true values, return the error_type, package name, indices, array values, and description of error for each True value in criteria. """ if np.any(criteria): inds = np.where(criteria) v = a[inds] # works with structured or unstructured pn = [] * len(v) en = [error_name] * len(v) tp = [error_type] * len(v) indsT = np.transpose(inds) # _get_summary_array requires 3 columns for k, i, j, # but indsT will only have two columns if a 2-D array is being compared # pad indsT with a column of zeros for k if indsT.shape[1] == 2: indsT = np.column_stack( [np.zeros(indsT.shape[0], dtype=int), indsT] ) sa = np.column_stack([tp, pn, indsT, v, en]) sa = self._get_summary_array(sa) self.summary_array = np.append(self.summary_array, sa).view( np.recarray ) self.remove_passed(error_name) else: self.append_passed(error_name)
[docs] def view_summary_array_fields(self, fields): arr = self.summary_array dtype2 = np.dtype({name: arr.dtype.fields[name] for name in fields}) return np.ndarray(arr.shape, dtype2, arr, 0, arr.strides)
[docs] def summarize(self): # write the summary array to text file (all levels) if self.f is not None: self.f.write(self.print_summary()) self.f.close() # print the screen output depending on level txt = "" # tweak screen output for model-level to report package for each error if "MODEL" in self.prefix: # add package name for model summary output packages = self.summary_array.package desc = self.summary_array.desc self.summary_array["desc"] = [ f"\r {packages[i]} package: {d.strip()}" if packages[i] != "model" else d for i, d in enumerate(desc) ] for etype in ["Error", "Warning"]: a = self.summary_array[self.summary_array.type == etype] desc = a.desc t = "" if len(a) > 0: t += f" {len(a)} {etype}s:\n" if len(a) == 1: t = t.replace("s", "") # grammar for e in np.unique(desc): n = np.sum(desc == e) if n > 1: t += f" {n} instances of {e}\n" else: t += f" {n} instance of {e}\n" txt += t if txt == "": txt += " No errors or warnings encountered.\n" elif ( self.f is not None and self.verbose and self.summary_array.shape[0] > 0 ): txt += f" see {self.summaryfile} for details.\n" # print checks that passed for higher levels if len(self.passed) > 0 and self.level > 0: txt += "\n Checks that passed:\n" for chkname in self.passed: txt += f" {chkname}\n" self.txt += txt # for level 2, print the whole summary table at the bottom if self.level > 1: # kludge to improve screen printing self.summary_array["package"] = [ f"{s} " for s in self.summary_array["package"] ] self.txt += "\nDETAILED SUMMARY:\n{}".format( self.print_summary(float_format="{:.2e}", delimiter="\t") ) if self.verbose: print(self.txt) elif self.summary_array.shape[0] > 0 and self.level > 0: print("Errors and/or Warnings encountered.") if self.f is not None: print(f" see {self.summaryfile} for details.\n")
# start of older model specific code def _has_cell_indices(self, stress_period_data): if self.model.has_package("DIS") and {"k", "i", "j"}.intersection( set(stress_period_data.dtype.names) ) != {"k", "i", "j"}: self._add_to_summary( type="Error", desc="\r Stress period data missing k, " "i, j for structured grid.", ) return False elif ( self.model.has_package("DISU") and "node" not in stress_period_data.dtype.names ): self._add_to_summary( type="Error", desc="\r Stress period data missing " "node number for unstructured grid.", ) return False return True def _get_cell_inds(self, spd): return (spd.k, spd.i, spd.j) if self.structured else (spd.node) def _get_cell_inds_names(self): return ["k", "i", "j"] if self.structured else ["node"] def _get_dtype(self): if self.structured: # include node column for structured grids (useful for indexing) return np.dtype( [ ("type", object), ("package", object), ("k", int), ("i", int), ("j", int), ("value", float), ("desc", object), ] ) else: return np.dtype( [ ("type", object), ("package", object), ("node", int), ("value", float), ("desc", object), ] )
[docs] def get_neighbors(self, a): """ For a structured grid, this returns the 6 neighboring values for each value in a. For an unstructured grid, this returns the n_max neighboring values, where n_max is the maximum number of nodal connections for any node within the model; nodes with less than n_max connections are assigned np.nan for indices above the number of connections for that node. Parameters ---------- a : 3-D Model array in layer, row, column order array, even for an unstructured grid; for instance, a Util3d array (e.g. flopy.modflow.ModflowBas.ibound). Returns ------- neighbors : 4-D array Array of neighbors, where axis 0 contains the n neighboring values for each value in a, and subsequent axes are in layer, row, column order. "n" is 6 for a structured grid, and "n" is n_max for an unstructured grid, as described above. Nan is returned for values at edges. """ if self.structured: nk, ni, nj = a.shape tmp = np.empty((nk + 2, ni + 2, nj + 2), dtype=float) tmp[:, :, :] = np.nan tmp[1:-1, 1:-1, 1:-1] = a[:, :, :] neighbors = np.vstack( [ tmp[0:-2, 1:-1, 1:-1].ravel(), # k-1 tmp[2:, 1:-1, 1:-1].ravel(), # k+1 tmp[1:-1, 0:-2, 1:-1].ravel(), # i-1 tmp[1:-1, 2:, 1:-1].ravel(), # i+1 tmp[1:-1, 1:-1, :-2].ravel(), # j-1 tmp[1:-1, 1:-1, 2:].ravel(), ] ) # j+1 return neighbors.reshape(6, nk, ni, nj) else: if "DISU" in self.model.get_package_list(): disu = self.model.disu neighbors = disu._neighboring_nodes if isinstance(a, Util3d): a = a.array pad_value = int(-1e9) n_max = ( np.max(disu.iac.array) - 1 ) # -1 for self, removed below arr_neighbors = [ np.pad( a[n - 1], (0, n_max - n.size), "constant", constant_values=(0, pad_value), ) for n in neighbors ] arr_neighbors = np.where( arr_neighbors == -1e9, np.nan, arr_neighbors ) neighbors = arr_neighbors.T else: # if no disu, we can't define neighbours for this ugrid neighbors = None return neighbors
def _fmt_string_list(array, float_format="{}"): fmt_string = [] for field in array.dtype.descr: vtype = field[1][1].lower() if vtype == "i": fmt_string += ["{:.0f}"] elif vtype == "f": fmt_string += [float_format] elif vtype == "o": fmt_string += ["{}"] elif vtype == "s": raise Exception( "MfList error: 'str' type found in dtype. " "This gives unpredictable results when " "recarray to file - change to 'object' type" ) else: raise Exception( f"MfList.fmt_string error: unknown vtype in dtype:{vtype}" ) return fmt_string def _print_rec_array(array, cols=None, delimiter=" ", float_format="{:.6f}"): """ Print out a numpy record array to string, with column names. Parameters ---------- cols : list of strings List of columns to print. delimiter : string Delimited to use. Returns ------- txt : string Text string of array. """ txt = "" dtypes = list(array.dtype.names) if cols is not None: cols = [c for c in dtypes if c in cols] else: cols = dtypes # drop columns with no data if np.shape(array)[0] > 1: cols = [ c for c in cols if array["type"].dtype.kind == "O" or array[c].min() > -999999 ] # edit dtypes array_cols = fields_view(array, cols) fmts = _fmt_string_list(array_cols, float_format=float_format) txt += delimiter.join(cols) + "\n" array_cols = array_cols.copy().tolist() txt += "\n".join([delimiter.join(fmts).format(*r) for r in array_cols]) return txt
[docs]def fields_view(arr, fields): """ creates view of array that only contains the fields in fields. several-columns-in-numpy-structured-array """ dtype2 = np.dtype({name: arr.dtype.fields[name] for name in fields}) return np.ndarray(arr.shape, dtype2, arr, 0, arr.strides)
[docs]class mf6check(check): def __init__( self, package, f=None, verbose=True, level=1, property_threshold_values={}, ): super().__init__(package, f, verbose, level, property_threshold_values) if hasattr(package, "model_or_sim"): self.model = package.model_or_sim @staticmethod def _get_cellid_cols(inds, inds_col): a = inds[inds_col[0]] return np.asarray(a.tolist()) def _get_cell_inds(self, spd): hnames = () if "cellid" in spd.dtype.names: cellid = spd.cellid elif "cellid1" in spd.dtype.names: cellid = spd.cellid1 else: return None for item in zip(*cellid): hnames += ( np.ndarray( shape=(len(item),), buffer=np.array(item), dtype=np.int32 ), ) return hnames def _get_dtype(self): mg = self.model.modelgrid if mg.grid_type == "structured": return np.dtype( [ ("type", object), ("package", object), ("k", int), ("i", int), ("j", int), ("value", float), ("desc", object), ] ) elif mg.grid_type == "vertex": return np.dtype( [ ("type", object), ("package", object), ("lay", int), ("cell", int), ("value", float), ("desc", object), ] ) else: return np.dtype( [ ("type", object), ("package", object), ("node", int), ("value", float), ("desc", object), ] ) def _has_cell_indices(self, stress_period_data): mg = self.model.modelgrid if ( mg.grid_type == "structured" or mg.grid_type == "vertex" or mg.grid_type == "unstructured" ): if "cellid" not in set( stress_period_data.dtype.names ) and "cellid1" not in set(stress_period_data.dtype.names): self._add_to_summary( type="Error", desc="\r Stress period data missing cellid.", ) return False return True def _get_cell_inds_names(self): return ["cellid"]
[docs] def get_active(self, include_cbd=False): """Returns a boolean array of active cells for the model. Parameters ---------- include_cbd : boolean Does not apply to MF6 models, always false. Returns ------- active : 3-D boolean array True where active. """ mg = self.model.modelgrid idomain = mg.idomain if idomain is None: return np.ones(shape=mg.shape, dtype=bool) else: return idomain > 0