Source code for flopy.mt3d.mtssm

import warnings

import numpy as np

from ..pakbase import Package
from ..utils import MfList, Transient2d, Util2d

# Note: Order matters as first 6 need logical flag on line 1 of SSM file
SsmLabels = ["WEL", "DRN", "RCH", "EVT", "RIV", "GHB", "BAS6", "CHD", "PBC"]


[docs]class SsmPackage: def __init__(self, label="", instance=None, needTFstr=False): self.label = label self.instance = instance self.needTFstr = needTFstr self.TFstr = " F" if self.instance is not None: self.TFstr = " T"
[docs]class Mt3dSsm(Package): """ MT3DMS Source and Sink Mixing Package Class. Parameters ---------- model : model object The model object (of type :class:`flopy.mt3d.mt.Mt3dms`) to which this package will be added. crch : Transient2d, scalar, array of floats, or dictionary CRCH is the concentration of recharge for species 1. If the recharge flux is positive, it acts as a source whose concentration can be specified as desired. If the recharge flux is negative, it acts as a sink (discharge) whose concentration is always set equal to the concentration of groundwater at the cell where discharge occurs. Note that the location and flow rate of recharge/discharge are obtained from the flow model directly through the unformatted flow-transport link file. crch can be specified as an array, if the array is constant for the entire simulation. If crch changes by stress period, then the user must provide a dictionary, where the key is the stress period number (zero based) and the value is the recharge array. The recharge concentration can be specified for additional species by passing additional arguments to the Mt3dSsm constructor. For example, to specify the recharge concentration for species two one could use crch2={0: 0., 1: 10*np.ones((nrow, ncol), dtype=float)} as and additional keyword argument that is passed to Mt3dSsm when making the ssm object. cevt : Transient2d, scalar, array of floats, or dictionary is the concentration of evapotranspiration flux for species 1. Evapotranspiration is the only type of sink whose concentration may be specified externally. Note that the concentration of a sink cannot be greater than that of the aquifer at the sink cell. Thus, if the sink concentration is specified greater than that of the aquifer, it is automatically set equal to the concentration of the aquifer. Also note that the location and flow rate of evapotranspiration are obtained from the flow model directly through the unformatted flow-transport link file. For multi-species simulations, see crch for a description of how to specify additional concentrations arrays for each species. stress_period_data : dictionary Keys in the dictionary are stress zero-based stress period numbers; values in the dictionary are recarrays of SSM boundaries. The dtype for the recarray can be obtained using ssm.dtype (after the ssm package has been created). The default dtype for the recarray is np.dtype([('k', int), ("i", int), ("j", int), ("css", np.float32), ("itype", int), ((cssms(n), float), n=1, ncomp)]) If there are more than one component species, then additional entries will be added to the dtype as indicated by cssm(n). Note that if the number of dictionary entries is less than the number of stress periods, then the last recarray of boundaries will apply until the end of the simulation. Full details of all options to specify stress_period_data can be found in the flopy3_multi-component_SSM ipython notebook in the Notebook subdirectory of the examples directory. css is the specified source concentration or mass-loading rate, depending on the value of ITYPE, in a single-species simulation, (For a multispecies simulation, CSS is not used, but a dummy value still needs to be entered here.) Note that for most types of sources, CSS is interpreted as the source concentration with the unit of mass per unit volume (ML-3), which, when multiplied by its corresponding flow rate (L3T-1) from the flow model, yields the mass-loading rate (MT-1) of the source. For a special type of sources (ITYPE = 15), CSS is taken directly as the mass-loading rate (MT-1) of the source so that no flow rate is required from the flow model. Furthermore, if the source is specified as a constant-concentration cell (itype = -1), the specified value of CSS is assigned directly as the concentration of the designated cell. If the designated cell is also associated with a sink/source term in the flow model, the flow rate is not used. itype is an integer indicating the type of the point source. An itype dictionary can be retrieved from the ssm object as itype = mt3d.Mt3dSsm.itype_dict() (CSSMS(n), n=1, NCOMP) defines the concentrations of a point source for multispecies simulation with NCOMP>1. In a multispecies simulation, it is necessary to define the concentrations of all species associated with a point source. As an example, if a chemical of a certain species is injected into a multispecies system, the concentration of that species is assigned a value greater than zero while the concentrations of all other species are assigned zero. CSSMS(n) can be entered in free format, separated by a comma or space between values. Several important notes on assigning concentration for the constant-concentration condition (ITYPE = -1) are listed below: The constant-concentration condition defined in this input file takes precedence to that defined in the Basic Transport Package input file. In a multiple stress period simulation, a constant-concentration cell, once defined, will remain a constant- concentration cell in the duration of the simulation, but its concentration value can be specified to vary in different stress periods. In a multispecies simulation, if it is only necessary to define different constant-concentration conditions for selected species at the same cell location, specify the desired concentrations for those species, and assign a negative value for all other species. The negative value is a flag used by MT3DMS to skip assigning the constant-concentration condition for the designated species. dtype : np.dtype dtype to use for the recarray of boundaries. If left as None (the default) then the dtype will be automatically constructed. extension : string Filename extension (default is 'ssm') unitnumber : int File unit number (default is None). filenames : str or list of str Filenames to use for the package. If filenames=None the package name will be created using the model name and package extension. If a single string is passed the package will be set to the string. Default is None. Attributes ---------- Methods ------- See Also -------- Notes ----- Examples -------- >>> import flopy >>> m = flopy.mt3d.Mt3dms() >>> itype = mt3d.Mt3dSsm.itype_dict() >>> ssm_data = {} >>> ssm_data[0] = [(4, 4, 4, 1.0, itype['GHB'], 1.0, 100.0)] >>> ssm_data[5] = [(4, 4, 4, 0.5, itype['GHB'], 0.5, 200.0)] >>> ssm = flopy.mt3d.Mt3dSsm(m, stress_period_data=ssm_data) """ def __init__( self, model, crch=None, cevt=None, mxss=None, stress_period_data=None, dtype=None, extension="ssm", unitnumber=None, filenames=None, **kwargs, ): if unitnumber is None: unitnumber = Mt3dSsm._defaultunit() elif unitnumber == 0: unitnumber = Mt3dSsm._reservedunit() # call base package constructor super().__init__( model, extension=extension, name=self._ftype(), unit_number=unitnumber, filenames=self._prepare_filenames(filenames), ) deprecated_kwargs = ["criv", "cghb", "cibd", "cchd", "cpbc", "cwel"] for key in kwargs: if key in deprecated_kwargs: warnings.warn( "Deprecation Warning: Keyword argument '{}' no longer " "supported. Use 'stress_period_data' instead.".format(key), category=UserWarning, ) # Set dimensions mf = self.parent.mf nrow = model.nrow ncol = model.ncol nlay = model.nlay ncomp = model.ncomp nper = model.nper # Create a list of SsmPackage (class defined above) self.__SsmPackages = [] if mf is not None: for i, label in enumerate(SsmLabels): mfpack = mf.get_package(label) ssmpack = SsmPackage(label, mfpack, (i < 6)) self.__SsmPackages.append( ssmpack ) # First 6 need T/F flag in file line 1 if dtype is not None: self.dtype = dtype else: self.dtype = self.get_default_dtype(ncomp) if stress_period_data is None: self.stress_period_data = None else: self.stress_period_data = MfList( self, model=model, data=stress_period_data, list_free_format=False, ) if mxss is None and mf is None: warnings.warn( "SSM Package: mxss is None and modflowmodel is None. " "Cannot calculate max number of sources and sinks. " "Estimating from stress_period_data.", category=UserWarning, ) if mxss is None: # Need to calculate max number of sources and sinks self.mxss = 0 mxss_kper = 0 # Do not assume first key (stress period 0) has data, it may # not. Cycle through stress periods looking for one w/ data if self.stress_period_data is not None: for i in range(nper): if i in self.stress_period_data.data: mxss_kper += np.sum( self.stress_period_data.data[i].itype == -1 ) mxss_kper += np.sum( self.stress_period_data.data[i].itype == -15 ) self.mxss = max(self.mxss, mxss_kper) if isinstance(self.parent.btn.icbund, np.ndarray): self.mxss += (self.parent.btn.icbund < 0).sum() for p in self.__SsmPackages: if (p.label == "BAS6") and (p.instance is not None): self.mxss += (p.instance.ibound.array < 0).sum() elif p.instance is not None: self.mxss += p.instance._ncells() else: self.mxss = mxss # Note: list is used for multi-species, NOT for stress periods! self.crch = None try: if crch is None and model.mf.rch is not None: print("found 'rch' in modflow model, resetting crch to 0.0") crch = 0.0 except: if model.verbose: print(" explicit crcg in file") if crch is not None: self.crch = [] t2d = Transient2d( model, (nrow, ncol), np.float32, crch, name="crch1", locat=self.unit_number[0], array_free_format=False, ) self.crch.append(t2d) if ncomp > 1: for icomp in range(2, ncomp + 1): val = 0.0 name = f"crch{icomp}" if name in list(kwargs.keys()): val = kwargs.pop(name) else: print( "SSM: setting crch for component {} to zero. " "kwarg name {}".format(icomp, name) ) t2d = Transient2d( model, (nrow, ncol), np.float32, val, name=name, locat=self.unit_number[0], array_free_format=False, ) self.crch.append(t2d) # else: # try: # if model.mf.rch is not None: # print("found 'rch' in modflow model, resetting crch to 0.0") # self.crch = [Transient2d(model, (nrow, ncol), np.float32, # 0, name='crch1', # locat=self.unit_number[0], # array_free_format=False)] # # else: # self.crch = None # except: # self.crch = None self.cevt = None try: if cevt is None and ( model.mf.evt is not None or model.mf.ets is not None ): print( "found 'ets'/'evt' in modflow model, resetting cevt to 0.0" ) cevt = 0.0 except: if model.verbose: print(" explicit cevt in file") if cevt is not None: self.cevt = [] t2d = Transient2d( model, (nrow, ncol), np.float32, cevt, name="cevt1", locat=self.unit_number[0], array_free_format=False, ) self.cevt.append(t2d) if ncomp > 1: for icomp in range(2, ncomp + 1): val = 0.0 name = f"cevt{icomp}" if name in list(kwargs.keys()): val = kwargs[name] kwargs.pop(name) else: print( "SSM: setting cevt for component {} to zero, " "kwarg name {}".format(icomp, name) ) t2d = Transient2d( model, (nrow, ncol), np.float32, val, name=name, locat=self.unit_number[0], array_free_format=False, ) self.cevt.append(t2d) # else: # try: # if model.mf.evt is not None or model.mf.ets is not None: # print("found 'ets'/'evt' in modflow model, resetting cevt to 0.0") # self.cevt = [Transient2d(model, (nrow, ncol), np.float32, # 0, name='cevt1', # locat=self.unit_number[0], # array_free_format=False)] # # else: # self.cevt = None # except: # self.cevt = None if len(list(kwargs.keys())) > 0: raise Exception( "SSM error: unrecognized kwargs: " + " ".join(list(kwargs.keys())) ) # Add self to parent and return self.parent.add_package(self) return
[docs] def from_package(self, package, ncomp_aux_names): """ read the point source and sink info from a package ncomp_aux_names (list): the aux variable names in the package that are the component concentrations """ raise NotImplementedError()
[docs] @staticmethod def itype_dict(): itype = {} itype["CHD"] = 1 itype["BAS6"] = 1 itype["PBC"] = 1 itype["WEL"] = 2 itype["DRN"] = 3 itype["RIV"] = 4 itype["GHB"] = 5 itype["MAS"] = 15 itype["CC"] = -1 return itype
[docs] @staticmethod def get_default_dtype(ncomp=1): """ Construct a dtype for the recarray containing the list of sources and sinks """ type_list = [ ("k", int), ("i", int), ("j", int), ("css", np.float32), ("itype", int), ] if ncomp > 1: for comp in range(1, ncomp + 1): comp_name = f"cssm({comp:02d})" type_list.append((comp_name, np.float32)) dtype = np.dtype(type_list) return dtype
[docs] def write_file(self): """ Write the package file Returns ------- None """ # Open file for writing f_ssm = open(self.fn_path, "w") for p in self.__SsmPackages: if p.needTFstr: f_ssm.write(p.TFstr) f_ssm.write(" F F F F F F F F F F\n") f_ssm.write(f"{self.mxss:10d}\n") # Loop through each stress period and write ssm information nper = self.parent.nper for kper in range(nper): if f_ssm.closed: f_ssm = open(f_ssm.name, "a") # Distributed sources and sinks (Recharge and Evapotranspiration) if self.crch is not None: # If any species need to be written, then all need to be # written incrch = -1 for t2d in self.crch: incrchicomp, file_entry = t2d.get_kper_entry(kper) incrch = max(incrch, incrchicomp) if incrch == 1: break f_ssm.write(f"{incrch:10d}\n") if incrch == 1: for t2d in self.crch: u2d = t2d[kper] file_entry = u2d.get_file_entry() f_ssm.write(file_entry) if self.cevt is not None: # If any species need to be written, then all need to be # written incevt = -1 for t2d in self.cevt: incevticomp, file_entry = t2d.get_kper_entry(kper) incevt = max(incevt, incevticomp) if incevt == 1: break f_ssm.write(f"{incevt:10d}\n") if incevt == 1: for t2d in self.cevt: u2d = t2d[kper] file_entry = u2d.get_file_entry() f_ssm.write(file_entry) # List of sources if self.stress_period_data is not None: self.stress_period_data.write_transient(f_ssm, single_per=kper) else: f_ssm.write("0\n") f_ssm.close() return
[docs] @classmethod def load( cls, f, model, nlay=None, nrow=None, ncol=None, nper=None, ncomp=None, ext_unit_dict=None, ): """ Load an existing package. Parameters ---------- f : filename or file handle File to load. model : model object The model object (of type :class:`flopy.mt3d.mt.Mt3dms`) to which this package will be added. ext_unit_dict : dictionary, optional If the arrays in the file are specified using EXTERNAL, or older style array control records, then `f` should be a file handle. In this case ext_unit_dict is required, which can be constructed using the function :class:`flopy.utils.mfreadnam.parsenamefile`. Returns ------- ssm : Mt3dSsm object Mt3dSsm object. Examples -------- >>> import flopy >>> mt = flopy.mt3d.Mt3dms() >>> ssm = flopy.mt3d.Mt3dSsm.load('test.ssm', mt) """ if model.verbose: print("loading ssm package file...") # Open file, if necessary openfile = not hasattr(f, "read") if openfile: filename = f f = open(filename, "r") # Set modflow model and dimensions if necessary mf = model.mf if nlay is None: nlay = model.nlay if nrow is None: nrow = model.nrow if ncol is None: ncol = model.ncol if nper is None: nper = model.nper if ncomp is None: ncomp = model.ncomp # dtype dtype = Mt3dSsm.get_default_dtype(ncomp) # Dataset 0 -- comment line while True: line = f.readline() if line[0] != "#": break # Item D1: Dummy input line - line already read above if model.verbose: print( " loading FWEL, FDRN, FRCH, FEVT, FRIV, FGHB, (FNEW(n), n=1,4)..." ) fwel = line[0:2] fdrn = line[2:4] frch = line[4:6] fevt = line[6:8] friv = line[8:10] fghb = line[10:12] if len(line) >= 14: fnew1 = line[12:14] else: fnew1 = "F" if len(line) >= 16: fnew2 = line[14:16] else: fnew2 = "F" if len(line) >= 18: fnew3 = line[16:18] else: fnew3 = "F" if len(line) >= 20: fnew4 = line[18:20] else: fnew4 = "F" if model.verbose: print(f" FWEL {fwel}") print(f" FDRN {fdrn}") print(f" FRCH {frch}") print(f" FEVT {fevt}") print(f" FRIV {friv}") print(f" FGHB {fghb}") print(f" FNEW1 {fnew1}") print(f" FNEW2 {fnew2}") print(f" FNEW3 {fnew3}") print(f" FNEW4 {fnew4}") # Override the logical settings at top of ssm file using the # modflowmodel, if it is attached to parent if mf is not None: rchpack = mf.get_package("RCH") if rchpack is not None: frch = "t" evtpack = mf.get_package("EVT") if evtpack is not None: fevt = "t" # Item D2: MXSS, ISSGOUT mxss = None if model.verbose: print(" loading MXSS, ISSGOUT...") line = f.readline() mxss = int(line[0:10]) try: issgout = int(line[10:20]) except: issgout = 0 if model.verbose: print(f" MXSS {mxss}") print(f" ISSGOUT {issgout}") # kwargs needed to construct crch2, crch3, etc. for multispecies kwargs = {} crch = None if "t" in frch.lower(): t2d = 0.0 crch = {0: t2d} if ncomp > 1: for icomp in range(2, ncomp + 1): name = f"crch{icomp}" t2d = 0.0 kwargs[name] = {0: t2d} cevt = None if "t" in fevt.lower(): t2d = 0.0 cevt = {0: t2d} if ncomp > 1: for icomp in range(2, ncomp + 1): name = f"cevt{icomp}" t2d = 0.0 kwargs[name] = {0: t2d} stress_period_data = {} for iper in range(nper): if model.verbose: print(f" loading ssm for kper {iper + 1:5d}") # Item D3: INCRCH incrch = -1 if "t" in frch.lower(): if model.verbose: print(" loading INCRCH...") line = f.readline() incrch = int(line[0:10]) # Item D4: CRCH if incrch >= 0: if model.verbose: print(" loading CRCH...") t = Util2d.load( f, model, (nrow, ncol), np.float32, "crch", ext_unit_dict, array_format="mt3d", ) crch[iper] = t # Load each multispecies array if ncomp > 1: for icomp in range(2, ncomp + 1): name = f"crch{icomp}" if model.verbose: print(f" loading {name}...") t = Util2d.load( f, model, (nrow, ncol), np.float32, name, ext_unit_dict, array_format="mt3d", ) crchicomp = kwargs[name] crchicomp[iper] = t # Item D5: INCEVT incevt = -1 if "t" in fevt.lower(): if model.verbose: print(" loading INCEVT...") line = f.readline() incevt = int(line[0:10]) # Item D6: CEVT if incevt >= 0: if model.verbose: print(" loading CEVT...") t = Util2d.load( f, model, (nrow, ncol), np.float32, "cevt", ext_unit_dict, array_format="mt3d", ) cevt[iper] = t # Load each multispecies array if ncomp > 1: for icomp in range(2, ncomp + 1): name = f"cevt{icomp}" if model.verbose: print(f" loading {name}...") t = Util2d.load( f, model, (nrow, ncol), np.float32, name, ext_unit_dict, array_format="mt3d", ) cevticomp = kwargs[name] cevticomp[iper] = t # Item D7: NSS if model.verbose: print(" loading NSS...") line = f.readline() nss = int(line[0:10]) if model.verbose: print(f" NSS {nss}") # Item D8: KSS, ISS, JSS, CSS, ITYPE, (CSSMS(n),n=1,NCOMP) if model.verbose: print( " loading KSS, ISS, JSS, CSS, ITYPE, " "(CSSMS(n),n=1,NCOMP)..." ) if nss > 0: current = np.empty((nss), dtype=dtype) for ibnd in range(nss): line = f.readline() t = [] for ivar in range(5): istart = ivar * 10 istop = istart + 10 t.append(line[istart:istop]) ncssms = len(current.dtype.names) - 5 if ncssms > 0: tt = line[istop:].strip().split() for ivar in range(ncssms): t.append(tt[ivar]) current[ibnd] = tuple(t[: len(current.dtype.names)]) # convert indices to zero-based current["k"] -= 1 current["i"] -= 1 current["j"] -= 1 current = current.view(np.recarray) stress_period_data[iper] = current elif nss == 0: stress_period_data[iper] = nss if openfile: f.close() # set package unit number unitnumber = None filenames = [None] if ext_unit_dict is not None: unitnumber, filenames[0] = model.get_ext_dict_attr( ext_unit_dict, filetype=Mt3dSsm._ftype() ) # Construct and return ssm package return cls( model, crch=crch, cevt=cevt, mxss=mxss, stress_period_data=stress_period_data, unitnumber=unitnumber, filenames=filenames, **kwargs, )
@staticmethod def _ftype(): return "SSM" @staticmethod def _defaultunit(): return 34 @staticmethod def _reservedunit(): return 4