Source code for flopy.mt3d.mtsft

import numpy as np

from ..pakbase import Package
from ..utils import MfList, Util2d

__author__ = "emorway"


[docs]class Mt3dSft(Package): """ MT3D-USGS StreamFlow Transport package class Parameters ---------- model : model object The model object (of type :class:`flopy.mt3dms.mt.Mt3dms`) to which this package will be added. nsfinit : int Is the number of simulated stream reaches (in SFR2, the number of stream reaches is greater than or equal to the number of stream segments). This is equal to NSTRM found on the first line of the SFR2 input file. If NSFINIT > 0 then surface-water transport is solved in the stream network while taking into account groundwater exchange and precipitation and evaporation sources and sinks. Otherwise, if NSFINIT < 0, the surface-water network as represented by the SFR2 flow package merely acts as a boundary condition to the groundwater transport problem; transport in the surface-water network is not simulated. mxsfbc : int Is the maximum number of stream boundary conditions. icbcsf : int Is an integer value that directs MT3D-USGS to write reach-by-reach concentration information to unit ICBCSF. ioutobs : int Is the unit number of the output file for simulated concentrations at specified gage locations. The NAM file must also list the unit number to which observation information will be written. ietsfr : int Specifies whether or not mass will exit the surface-water network with simulated evaporation. If IETSFR = 0, then mass does not leave via stream evaporation. If IETSFR > 0, then mass is allowed to exit the simulation with the simulated evaporation. isfsolv : int Specifies the numerical technique that will be used to solve the transport problem in the surface water network. The first release of MT3D-USGS (version 1.0) only allows for a finite-difference formulation and regardless of what value the user specifies, the variable defaults to 1, meaning the finite-difference solution is invoked. wimp : float Is the stream solver time weighting factor. Ranges between 0.0 and 1.0. Values of 0.0, 0.5, or 1.0 correspond to explicit, Crank-Nicolson, and fully implicit schemes, respectively. wups : float Is the space weighting factor employed in the stream network solver. Ranges between 0.0 and 1.0. Values of 0.0 and 1.0 correspond to a central-in-space and upstream weighting factors, respectively. cclosesf : float Is the closure criterion for the SFT solver mxitersf : int Limits the maximum number of iterations the SFT solver can use to find a solution of the stream transport problem. crntsf : float Is the Courant constraint specific to the SFT time step, its value has no bearing upon the groundwater transport solution time step. iprtxmd : int A flag to print SFT solution information to the standard output file. IPRTXMD = 0 means no SFT solution information is printed; IPRTXMD = 1 means SFT solution summary information is printed at the end of every MT3D-USGS outer iteration; and IPRTXMD = 2 means SFT solution details are written for each SFT outer iteration that calls the xMD solver that solved SFT equations. coldsf : array of floats Represents the initial concentrations in the surface water network. The length of the array is equal to the number of stream reaches and starting concentration values should be entered in the same order that individual reaches are entered for record set 2 in the SFR2 input file. To specify starting concentrations for other species in a multi-species simulation, include additional keywords, such as coldsf2, coldsf3, and so forth. dispsf : array of floats Is the dispersion coefficient [L2 T-1] for each stream reach in the simulation and can vary for each simulated component of the simulation. That is, the length of the array is equal to the number of simulated stream reaches times the number of simulated components. Values of dispersion for each reach should be entered in the same order that individual reaches are entered for record set 2 in the SFR2 input file. To specify dispsf for other species in a multi-species simulation, include additional keywords, such as dispsf2, dispsf3, and so forth. nobssf : int Specifies the number of surface flow observation points for monitoring simulated concentrations in streams. isobs : int The segment number for each stream flow concentration observation point. irobs : int The reach number for each stream flow concentration observation point. ntmp : int The number of specified stream boundary conditions to follow. For the first stress period, this value must be greater than or equal to zero, but may be less than zero in subsequent stress periods. isegbc : int Is the segment number for which the current boundary condition will be applied. irchbc : int Is the reach number for which the current boundary condition will be applied. isfbctyp : int Specifies, for ISEGBC/IRCHBC, what the boundary condition type is 0 A headwater boundary. That is, for streams entering at the boundary of the simulated domain that need a specified concentration, use ISFBCTYP = 0 1 a precipitation boundary. If precipitation directly to channels is simulated in the flow model and a non-zero concentration (default is zero) is desired, use ISFBCTYP = 1 2 a runoff boundary condition that is not the same thing as runoff simulated in the UZF1 package and routed to a stream (or lake) using the IRNBND array. Users who specify runoff in the SFR2 input via the RUNOFF variable appearing in either record sets 4b or 6a and want to assign a non-zero concentration (default is zero) associated with this specified source, use ISFBCTYP=2; 3 a constant-concentration boundary. Any ISEGBC/IRCHBC combination may set equal to a constant concentration boundary condition. 4 a pumping boundary condition. 5 an evaporation boundary condition. In models where evaporation is simulated directly from the surface of the channel, users can use this boundary condition to specify a non-zero concentration (default is zero) associated with the evaporation losses. cbcsf : float Is the specified concentration associated with the current boundary condition entry. Repeat CBCSF for each simulated species (NCOMP). extension : string Filename extension (default is 'sft') unitnumber : int File unit number (default is None). filenames : str or list of str Filenames to use for the package and the output files. If filenames=None the package name will be created using the model name and package extension and the sfr output name will be created using the model name and lake concentration observation extension (for example, modflowtest.cbc and modflowtest.sftcobs.out), if ioutobs is a number greater than zero. If a single string is passed the package will be set to the string and sfr concentration observation output name will be created using the model name and .sftcobs.out extension, if ioutobs is a number greater than zero. To define the names for all package files (input and output) the length of the list of strings should be 2. Default is None. Attributes ---------- Methods ------- See Also -------- Notes ----- Parameters are not supported in FloPy. Examples -------- >>> import flopy >>> datadir = 'examples/data/mt3d_test/mfnwt_mt3dusgs/sft_crnkNic' >>> mf = flopy.modflow.Modflow.load( ... 'CrnkNic.nam', model_ws=datadir, load_only=['dis', 'bas6']) >>> sfr = flopy.modflow.ModflowSfr2.load('CrnkNic.sfr2', mf) >>> chk = sfr.check() >>> # initialize an MT3D-USGS model >>> mt = flopy.mt3d.Mt3dms.load( ... 'CrnkNic.mtnam', exe_name='mt3d-usgs_1.0.00.exe', >>> model_ws=datadir, load_only='btn') >>> sft = flopy.mt3d.Mt3dSft.load(mt, 'CrnkNic.sft') """ def __init__( self, model, nsfinit=0, mxsfbc=0, icbcsf=0, ioutobs=0, ietsfr=0, isfsolv=1, wimp=0.50, wups=1.00, cclosesf=1.0e-6, mxitersf=10, crntsf=1.0, iprtxmd=0, coldsf=0.0, dispsf=0.0, nobssf=0, obs_sf=None, sf_stress_period_data=None, unitnumber=None, filenames=None, dtype=None, extension="sft", **kwargs, ): # set default unit number of one is not specified if unitnumber is None: unitnumber = Mt3dSft._defaultunit() elif unitnumber == 0: unitnumber = Mt3dSft._reservedunit() # set filenames filenames = self._prepare_filenames(filenames, 2) if filenames[1] is None and abs(ioutobs) > 0: filenames[1] = model.name if ioutobs is not None: ext = "sftcobs.out" if filenames[1] is not None: fname = filenames[1] if "." not in fname: # add extension fname += f".{ext}" else: fname = f"{model.name}.{ext}" model.add_output_file( abs(ioutobs), fname=fname, extension=None, binflag=False, package=self._ftype(), ) else: ioutobs = 0 # call base package constructor super().__init__( model, extension=extension, name=self._ftype(), unit_number=unitnumber, filenames=filenames[0], ) # Set dimensions nrow = model.nrow ncol = model.ncol nlay = model.nlay ncomp = model.ncomp mcomp = model.mcomp # Set package specific parameters self.nsfinit = nsfinit self.mxsfbc = mxsfbc self.icbcsf = icbcsf self.ioutobs = ioutobs self.ietsfr = ietsfr self.isfsolv = isfsolv self.wimp = wimp self.wups = wups self.cclosesf = cclosesf self.mxitersf = mxitersf self.crntsf = crntsf self.iprtxmd = iprtxmd # Set 1D array values self.coldsf = [ Util2d( model, (nsfinit,), np.float32, coldsf, name="coldsf", locat=self.unit_number[0], array_free_format=False, ) ] self.dispsf = [ Util2d( model, (nsfinit,), np.float32, dispsf, name="dispsf", locat=self.unit_number[0], array_free_format=False, ) ] ncomp = model.ncomp # handle the miult if ncomp > 1: for icomp in range(2, ncomp + 1): for base_name, attr in zip( ["coldsf", "dispsf"], [self.coldsf, self.dispsf] ): name = f"{base_name}{icomp}" if name in kwargs: val = kwargs.pop(name) else: print( f"SFT: setting {base_name} for component {icomp} " f"to zero, kwarg name {name}" ) val = 0.0 u2d = Util2d( model, (nsfinit,), np.float32, val, name=name, locat=self.unit_number[0], array_free_format=model.free_format, ) attr.append(u2d) # Set streamflow observation locations self.nobssf = nobssf self.obs_sf = obs_sf # Read and set transient data if dtype is not None: self.dtype = dtype else: self.dtype = self.get_default_dtype(ncomp) if sf_stress_period_data is None or len(sf_stress_period_data) == 0: self.sf_stress_period_data = None else: self.sf_stress_period_data = MfList( self, model=model, data=sf_stress_period_data ) self.sf_stress_period_data.list_free_format = True self.parent.add_package(self) return
[docs] @staticmethod def get_default_dtype(ncomp=1): """ Construct a dtype for the recarray containing the list of surface water boundary conditions. """ type_list = [ ("node", int), ("isfbctyp", int), ("cbcsf0", np.float32), ] if ncomp > 1: for icomp in range(1, ncomp): comp_name = f"cbcsf{icomp}" type_list.append((comp_name, np.float32)) dtype = np.dtype(type_list) return dtype
[docs] def write_file(self): """ Write the package file Returns ------- None Examples -------- >>> import flopy >>> datadir = .examples/data/mt3d_test/mfnwt_mt3dusgs/sft_crnkNic >>> mt = flopy.mt3d.Mt3dms.load( ... 'CrnkNic.mtnam', exe_name='mt3d-usgs_1.0.00.exe', ... model_ws=datadir, verbose=True) >>> mt.name = 'CrnkNic_rewrite' >>> mt.sft.dispsf.fmtin = '(10F12.2)' >>> mt.write_input() """ # Open file for writing f = open(self.fn_path, "w") # Item 1 f.write( "{:10d}{:10d}{:10d}{:10d}{:10d}".format( self.nsfinit, self.mxsfbc, self.icbcsf, self.ioutobs, self.ietsfr, ) + 30 * " " + "# nsfinit, mxsfbc, icbcsf, ioutobs, ietsfr\n" ) # Item 2 f.write( "{:10d}{:10.5f}{:10.5f}{:10.7f}{:10d}{:10.5f}{:10d}".format( self.isfsolv, self.wimp, self.wups, self.cclosesf, self.mxitersf, self.crntsf, self.iprtxmd, ) + " # isfsolv, wimp, wups, cclosesf, mxitersf, crntsf, " + "iprtxmd\n" ) # Item 3 for coldsf in self.coldsf: f.write(coldsf.get_file_entry()) # Item 4 for dispsf in self.dispsf: f.write(dispsf.get_file_entry()) # Item 5 f.write(f"{self.nobssf:10d} # nobssf\n") # Item 6 if self.nobssf != 0: for iobs in self.obs_sf: line = ( f"{iobs:10d} " "# location of obs as given by position in irch list\n" ) f.write(line) # Items 7, 8 # Loop through each stress period and assign source & sink concentrations to stream features nper = self.parent.nper for kper in range(nper): if f.closed: f = open(f.name, "a") # List of concentrations associated with various boundaries # interacting with the stream network. if self.sf_stress_period_data is not None: self.sf_stress_period_data.write_transient(f, single_per=kper) else: f.write(f"{0:10d} # ntmp - SP {kper:5d}\n") f.close() return
[docs] @classmethod def load( cls, f, model, nsfinit=None, nper=None, ncomp=None, ext_unit_dict=None ): """ Load an existing package. Parameters ---------- f : filename or file handle File to load. model : model object The model object (of type :class:`flopy.mt3d.mt.Mt3dms`) to which this package will be added. nsfinit : int number of simulated stream reaches in the surface-water transport process. isfsolv : int Specifies the numerical technique that will be used to solve the transport problem in the surface water network. The first release of MT3D-USGS (version 1.0) only allows for a finite-difference formulation and regardless of what value the user specifies, the variable defaults to 1, meaning the finite-difference solution is invoked. wimp : float Is the stream solver time weighting factor. Ranges between 0.0 and 1.0. Values of 0.0, 0.5, or 1.0 correspond to explicit, Crank-Nicolson, and fully implicit schemes, respectively. wups : float Is the space weighting factor employed in the stream network solver. Ranges between 0.0 and 1.0. Values of 0.0 and 1.0 correspond to a central-in-space and upstream weighting factors, respectively. cclosesf : float Is the closure criterion for the SFT solver mxitersf : int Limits the maximum number of iterations the SFT solver can use to find a solution of the stream transport problem. crntsf : float Is the Courant constraint specific to the SFT time step, its value has no bearing upon the groundwater transport solution time step. iprtxmd : int a flag to print SFT solution information to the standard output file. IPRTXMD can equal 0, 1, or 2, and will write increasing amounts of solver information to the standard output file, respectively. Returns ------- sft : MT3D-USGS object MT3D-USGS object Examples -------- >>> import os >>> import flopy >>> mf = flopy.modflow.Modflow.load('CrnkNic_mf.nam', ... load_only=['dis', 'bas6']) >>> sfr = flopy.modflow.ModflowSfr2.load('CrnkNic.sfr2', mf) >>> mt = flopy.mt3d.Mt3dms.load('CrnkNic_mt.nam', load_only='btn') >>> sft = flopy.mt3d.Mt3dSft.load('CrnkNic.sft', mt) """ if model.verbose: print("loading sft package file...") openfile = not hasattr(f, "read") if openfile: filename = f f = open(filename, "r") # Set default nlay values nlay = None nrow = None ncol = None # Set dimensions if necessary if nlay is None: nlay = model.nlay if nrow is None: nrow = model.nrow if ncol is None: ncol = model.ncol if nper is None: nper = model.nper if ncomp is None: ncomp = model.ncomp dtype = Mt3dSft.get_default_dtype(ncomp) # Item 1 (NSFINIT, MXSFBC, ICBCSF, IOUTOBS, IETSFR) line = f.readline() if line[0] == "#": raise ValueError("SFT package does not support comment lines") if model.verbose: print(" loading nsfinit, mxsfbc, icbcsf, ioutobs, ietsfr...") vals = line.strip().split() nsfinit = int(vals[0]) mxsfbc = int(vals[1]) icbcsf = int(vals[2]) ioutobs = int(vals[3]) ietsfr = int(vals[4]) if model.verbose: print(f" NSFINIT {nsfinit}") print(f" MXSFBC {mxsfbc}") print(f" ICBCSF {icbcsf}") print(f" IOUTOBS {ioutobs}") print(f" IETSFR {ietsfr}") if ietsfr == 0: print( " Mass does not exit the model via simulated " "stream evaporation " ) else: print( " Mass exits the stream network via simulated " "stream evaporation " ) # Item 2 (ISFSOLV, WIMP, WUPS, CCLOSESF, MXITERSF, CRNTSF, IPRTXMD) line = f.readline() if model.verbose: print( " loading isfsolv, wimp, wups, cclosesf, mxitersf, " "crntsf, iprtxmd..." ) vals = line.strip().split() if len(vals) < 7: raise ValueError("expected 7 values for item 2 of SFT input file") else: isfsolv = int(vals[0]) wimp = float(vals[1]) wups = float(vals[2]) cclosesf = float(vals[3]) mxitersf = int(vals[4]) crntsf = float(vals[5]) iprtxmd = int(vals[6]) if isfsolv != 1: isfsolv = 1 print(" Resetting isfsolv to 1") print(" In version 1.0 of MT3D-USGS, isfsov=1 is only option") if model.verbose: print(f" ISFSOLV {isfsolv}") print(f" WIMP {wimp}") print(f" WUPS {wups}") print(f" CCLOSESF {cclosesf}") print(f" MXITERSF {mxitersf}") print(f" CRNTSF {crntsf}") print(f" IPRTXMD {iprtxmd}") # Item 3 (COLDSF(NRCH)) Initial concentration if model.verbose: print(" loading COLDSF...") if model.free_format: print( " Using MODFLOW style array reader utilities to " "read COLDSF" ) elif model.array_format == "mt3d": print( " Using historic MT3DMS array reader utilities to " "read COLDSF" ) coldsf = Util2d.load( f, model, (np.abs(nsfinit),), np.float32, "coldsf1", ext_unit_dict, array_format=model.array_format, ) kwargs = {} if ncomp > 1: for icomp in range(2, ncomp + 1): name = f"coldsf{icomp}" if model.verbose: print(f" loading {name}...") u2d = Util2d.load( f, model, (nsfinit,), np.float32, name, ext_unit_dict, array_format=model.array_format, ) kwargs[name] = u2d # Item 4 (DISPSF(NRCH)) Reach-by-reach dispersion if model.verbose: if model.free_format: print( " Using MODFLOW style array reader utilities to " "read DISPSF" ) elif model.array_format == "mt3d": print( " Using historic MT3DMS array reader utilities to " "read DISPSF" ) dispsf = Util2d.load( f, model, (np.abs(nsfinit),), np.float32, "dispsf1", ext_unit_dict, array_format=model.array_format, ) if ncomp > 1: for icomp in range(2, ncomp + 1): name = f"dispsf{icomp}" if model.verbose: print(f" loading {name}...") u2d = Util2d.load( f, model, (np.abs(nsfinit),), np.float32, name, ext_unit_dict, array_format=model.array_format, ) kwargs[name] = u2d # Item 5 NOBSSF if model.verbose: print(" loading NOBSSF...") line = f.readline() m_arr = line.strip().split() nobssf = int(m_arr[0]) if model.verbose: print(f" NOBSSF {nobssf}") # If NOBSSF > 0, store observation segment & reach (Item 6) obs_sf = [] if nobssf > 0: if model.verbose: print( " loading {} observation locations given by ISOBS, " "IROBS...".format(nobssf) ) for i in range(nobssf): line = f.readline() m_arr = line.strip().split() obs_sf.append(int(m_arr[0])) obs_sf = np.array(obs_sf) if model.verbose: print(" Surface water concentration observation locations:") text = "" for o in obs_sf: text += f"{o} " print(f" {text}\n") else: if model.verbose: print(" No observation points specified.") sf_stress_period_data = {} for iper in range(nper): # Item 7 NTMP (Transient data) if model.verbose: print(f" loading NTMP...stress period {iper + 1} of {nper}") line = f.readline() m_arr = line.strip().split() ntmp = int(m_arr[0]) # Item 8 ISEGBC, IRCHBC, ISFBCTYP, CBCSF if model.verbose: print( " loading {} instances of ISEGBC, IRCHBC, " "ISFBCTYP, CBCSF...stress period {} of {}".format( ntmp, iper + 1, nper ) ) current_sf = 0 if ntmp > 0: current_sf = np.empty(ntmp, dtype=dtype) for ibnd in range(ntmp): line = f.readline() m_arr = line.strip().split() t = [] for ivar in range(3): # First three terms are not variable t.append(m_arr[ivar]) cbcsf = len(current_sf.dtype.names) - 3 if cbcsf > 0: for ivar in range(cbcsf): t.append(m_arr[ivar + 3]) current_sf[ibnd] = tuple( map(float, t[: len(current_sf.dtype.names)]) ) # Convert node IRCH indices to zero-based current_sf["node"] -= 1 current_sf = current_sf.view(np.recarray) sf_stress_period_data[iper] = current_sf else: if model.verbose: print(" No transient boundary conditions specified") pass if openfile: f.close() # 1 item for SFT input file, 1 item for SFTOBS file unitnumber = None filenames = [None, None] if ext_unit_dict is not None: unitnumber, filenames[0] = model.get_ext_dict_attr( ext_unit_dict, filetype=Mt3dSft._ftype() ) if abs(ioutobs) > 0: iu, filenames[1] = model.get_ext_dict_attr( ext_unit_dict, unit=abs(ioutobs) ) model.add_pop_key_list(abs(ioutobs)) # Construct and return SFT package return cls( model, nsfinit=nsfinit, mxsfbc=mxsfbc, icbcsf=icbcsf, ioutobs=ioutobs, ietsfr=ietsfr, isfsolv=isfsolv, wimp=wimp, cclosesf=cclosesf, mxitersf=mxitersf, crntsf=crntsf, iprtxmd=iprtxmd, coldsf=coldsf, dispsf=dispsf, nobssf=nobssf, obs_sf=obs_sf, sf_stress_period_data=sf_stress_period_data, unitnumber=unitnumber, filenames=filenames, **kwargs, )
@staticmethod def _ftype(): return "SFT" @staticmethod def _defaultunit(): return 19 @staticmethod def _reservedunit(): return 19