Source code for flopy.mt3d.mt

import os
import sys
import numpy as np
from ..mbase import BaseModel
from ..pakbase import Package
from ..utils import mfreadnam
from .mtbtn import Mt3dBtn
from .mtadv import Mt3dAdv
from .mtdsp import Mt3dDsp
from .mtssm import Mt3dSsm
from .mtrct import Mt3dRct
from .mtgcg import Mt3dGcg
from .mttob import Mt3dTob
from .mtphc import Mt3dPhc
from .mtuzt import Mt3dUzt
from .mtsft import Mt3dSft
from .mtlkt import Mt3dLkt
from ..discretization.structuredgrid import StructuredGrid
from flopy.discretization.modeltime import ModelTime


[docs]class Mt3dList(Package): """ List package class """ def __init__(self, model, extension="list", listunit=7): # Call ancestor's init to set self.parent, extension, name and # unit number Package.__init__(self, model, extension, "LIST", listunit) # self.parent.add_package(self) This package is not added to the base # model so that it is not included in get_name_file_entries() return def __repr__(self): return "List package class"
[docs] def write_file(self): # Not implemented for list class return
""" class Mt3dms(BaseModel): 'MT3DMS base class' def __init__(self, modelname='mt3dmstest', namefile_ext='nam', modflowmodel=None, ftlfilename=None, model_ws=None, external_path=None, verbose=False, load=True, listunit=7, exe_name='mt3dms.exe', ): BaseModel.__init__(self, modelname, namefile_ext, model_ws=model_ws, exe_name=exe_name) self.heading = '# Name file for MT3DMS, generated by Flopy.' self.__mf = modflowmodel self.lst = Mt3dList(self, listunit=listunit) self.ftlfilename = ftlfilename self.__adv = None self.__btn = None self.__dsp = None self.__gcg = None self.__rct = None self.__ssm = None self.array_free_format = False self.external_path = external_path self.external = False self.external_fnames = [] self.external_units = [] self.external_binflag = [] self.load = load self.__next_ext_unit = 500 if external_path is not None: if os.path.exists(external_path): print("Note: external_path " + str(external_path) + \ " already exists") # assert os.path.exists(external_path),'external_path does not exist' else: os.mkdir(external_path) self.external = True self.verbose = verbose return def __repr__(self): return 'MT3DMS model' def get_ncomp(self): btn = self.get_package('BTN') if (btn): return btn.ncomp else: return 1 # function to encapsulate next_ext_unit attribute def next_ext_unit(self): self.__next_ext_unit += 1 return self.__next_ext_unit def getadv(self): if (self.__adv == None): for p in (self.packagelist): if isinstance(p, Mt3dAdv): self.__adv = p return self.__adv def getbtn(self): if (self.__btn == None): for p in (self.packagelist): if isinstance(p, Mt3dBtn): self.__btn = p return self.__btn def getdsp(self): if (self.__dsp == None): for p in (self.packagelist): if isinstance(p, Mt3dDsp): self.__dsp = p return self.__dsp def getgcg(self): if (self.__gcg == None): for p in (self.packagelist): if isinstance(p, Mt3dGcg): self.__gcg = p return self.__gcg def getmf(self): return self.__mf def getrct(self): if (self.__rct == None): for p in (self.packagelist): if isinstance(p, Mt3dRct): self.__rct = p return self.__rct def getssm(self): if (self.__ssm == None): for p in (self.packagelist): if isinstance(p, Mt3dSsm): self.__ssm = p return self.__ssm def write_name_file(self): fn_path = os.path.join(self.model_ws, self.namefile) f_nam = open(fn_path, 'w') f_nam.write('%s\n' % (self.heading)) f_nam.write('%s %3i %s\n' % (self.lst.name[0], self.lst.unit_number[0], self.lst.file_name[0])) if self.ftlfilename is not None: f_nam.write('%s %3i %s\n' % ('FTL', 39, self.ftlfilename)) f_nam.write('%s' % self.get_name_file_entries()) for u, f in zip(self.external_units, self.external_fnames): f_nam.write('DATA {0:3d} '.format(u) + f + '\n') f_nam.close() adv = property(getadv) # Property has no setter, so read-only btn = property(getbtn) # Property has no setter, so read-only dsp = property(getdsp) # Property has no setter, so read-only gcg = property(getgcg) # Property has no setter, so read-only mf = property(getmf) # Property has no setter, so read-only rct = property(getrct) # Property has no setter, so read-only ssm = property(getssm) # Property has no setter, so read-only ncomp = property(get_ncomp) """
[docs]class Mt3dms(BaseModel): """ MT3DMS Model Class. Parameters ---------- modelname : string, optional Name of model. This string will be used to name the MODFLOW input that are created with write_model. (the default is 'mt3dtest') namefile_ext : string, optional Extension for the namefile (the default is 'nam') modflowmodel : flopy.modflow.mf.Modflow This is a flopy Modflow model object upon which this Mt3dms model is based. (the default is None) version : string, optional Version of MT3DMS to use (the default is 'mt3dms'). exe_name : string, optional The name of the executable to use (the default is 'mt3dms.exe'). listunit : integer, optional Unit number for the list file (the default is 2). model_ws : string, optional model workspace. Directory name to create model data sets. (default is the present working directory). external_path : string Location for external files (default is None). verbose : boolean, optional Print additional information to the screen (default is False). load : boolean, optional (default is True). silent : integer (default is 0) Attributes ---------- Methods ------- See Also -------- Notes ----- Examples -------- >>> import flopy >>> m = flopy.mt3d.mt.Mt3dms() """ def __init__( self, modelname="mt3dtest", namefile_ext="nam", modflowmodel=None, ftlfilename="mt3d_link.ftl", ftlfree=False, version="mt3dms", exe_name="mt3dms.exe", structured=True, listunit=None, ftlunit=None, model_ws=".", external_path=None, verbose=False, load=True, silent=0, ): # Call constructor for parent object BaseModel.__init__( self, modelname, namefile_ext, exe_name, model_ws, structured=structured, verbose=verbose, ) # Set attributes self.version_types = {"mt3dms": "MT3DMS", "mt3d-usgs": "MT3D-USGS"} self.set_version(version.lower()) if listunit is None: listunit = 16 if ftlunit is None: ftlunit = 10 self.lst = Mt3dList(self, listunit=listunit) self.mf = modflowmodel self.ftlfilename = ftlfilename self.ftlfree = ftlfree self.ftlunit = ftlunit self.free_format = None # Check whether specified ftlfile exists in model directory; if not, # warn user if os.path.isfile( os.path.join(self.model_ws, str(modelname + "." + namefile_ext)) ): with open( os.path.join( self.model_ws, str(modelname + "." + namefile_ext) ) ) as nm_file: for line in nm_file: if line[0:3] == "FTL": ftlfilename = line.strip().split()[2] break if ftlfilename is None: print("User specified FTL file does not exist in model directory") print("MT3D will not work without a linker file") else: if os.path.isfile(os.path.join(self.model_ws, ftlfilename)): # Check that the FTL present in the directory is of the format # specified by the user, i.e., is same as ftlfree # Do this by checking whether the first non-blank character is # an apostrophe. # If code lands here, then ftlfilename exists, open and read # first 4 characters f = open(os.path.join(self.model_ws, ftlfilename), "rb") c = f.read(4) if isinstance(c, bytes): c = c.decode() # if first non-blank char is an apostrophe, then formatted, # otherwise binary if (c.strip()[0] == "'" and self.ftlfree) or ( c.strip()[0] != "'" and not self.ftlfree ): pass else: msg = ( "Specified value of ftlfree conflicts with FTL " + "file format" ) print(msg) msg = ( "Switching ftlfree from " + "{} ".format(str(self.ftlfree)) + "to {}".format(str(not self.ftlfree)) ) print(msg) self.ftlfree = not self.ftlfree # Flip the bool # external option stuff self.array_free_format = False self.array_format = "mt3d" self.external_fnames = [] self.external_units = [] self.external_binflag = [] self.external = False self.load = load # the starting external data unit number self._next_ext_unit = 2000 if external_path is not None: # assert model_ws == '.', "ERROR: external cannot be used " + \ # "with model_ws" # external_path = os.path.join(model_ws, external_path) if os.path.exists(external_path): print( "Note: external_path " + str(external_path) + " already exists" ) # assert os.path.exists(external_path),'external_path does not exist' else: os.mkdir(external_path) self.external = True self.external_path = external_path self.verbose = verbose self.silent = silent # Create a dictionary to map package with package object. # This is used for loading models. self.mfnam_packages = { "btn": Mt3dBtn, "adv": Mt3dAdv, "dsp": Mt3dDsp, "ssm": Mt3dSsm, "rct": Mt3dRct, "gcg": Mt3dGcg, "tob": Mt3dTob, "phc": Mt3dPhc, "lkt": Mt3dLkt, "sft": Mt3dSft, "uzt2": Mt3dUzt, } return def __repr__(self): return "MT3DMS model" @property def modeltime(self): # build model time data_frame = { "perlen": self.mf.dis.perlen.array, "nstp": self.mf.dis.nstp.array, "tsmult": self.mf.dis.tsmult.array, } self._model_time = ModelTime( data_frame, self.mf.dis.itmuni_dict[self.mf.dis.itmuni], self.dis.start_datetime, self.dis.steady.array, ) return self._model_time @property def modelgrid(self): if not self._mg_resync: return self._modelgrid if self.btn is not None: ibound = self.btn.icbund.array delc = self.btn.delc.array delr = self.btn.delr.array top = self.btn.htop.array botm = np.subtract(top, self.btn.dz.array.cumsum(axis=0)) nlay = self.btn.nlay else: delc = self.mf.dis.delc.array delr = self.mf.dis.delr.array top = self.mf.dis.top.array botm = self.mf.dis.botm.array nlay = self.mf.nlay if self.mf.bas6 is not None: ibound = self.mf.bas6.ibound.array else: ibound = None # build grid self._modelgrid = StructuredGrid( delc=delc, delr=delr, top=top, botm=botm, idomain=ibound, proj4=self._modelgrid.proj4, epsg=self._modelgrid.epsg, xoff=self._modelgrid.xoffset, yoff=self._modelgrid.yoffset, angrot=self._modelgrid.angrot, nlay=nlay, ) # resolve offsets xoff = self._modelgrid.xoffset if xoff is None: if self._xul is not None: xoff = self._modelgrid._xul_to_xll(self._xul) else: xoff = self.mf._modelgrid.xoffset if xoff is None: # incase mf._modelgrid.xoffset is not set but mf._xul is if self.mf._xul is not None: xoff = self._modelgrid._xul_to_xll(self.mf._xul) else: xoff = 0.0 yoff = self._modelgrid.yoffset if yoff is None: if self._yul is not None: yoff = self._modelgrid._yul_to_yll(self._yul) else: yoff = self.mf._modelgrid.yoffset if yoff is None: # incase mf._modelgrid.yoffset is not set but mf._yul is if self.mf._yul is not None: yoff = self._modelgrid._yul_to_yll(self.mf._yul) else: yoff = 0.0 proj4 = self._modelgrid.proj4 if proj4 is None: proj4 = self.mf._modelgrid.proj4 epsg = self._modelgrid.epsg if epsg is None: epsg = self.mf._modelgrid.epsg angrot = self._modelgrid.angrot if angrot is None or angrot == 0.0: # angrot normally defaulted to 0.0 if self.mf._modelgrid.angrot is not None: angrot = self.mf._modelgrid.angrot else: angrot = 0.0 self._modelgrid.set_coord_info(xoff, yoff, angrot, epsg, proj4) self._mg_resync = not self._modelgrid.is_complete return self._modelgrid @property def solver_tols(self): if self.gcg is not None: return self.gcg.cclose, -999 return None @property def sr(self): if self.mf is not None: return self.mf.sr return None @property def nlay(self): if self.btn: return self.btn.nlay else: return 0 @property def nrow(self): if self.btn: return self.btn.nrow else: return 0 @property def ncol(self): if self.btn: return self.btn.ncol else: return 0 @property def nper(self): if self.btn: return self.btn.nper else: return 0 @property def ncomp(self): if self.btn: return self.btn.ncomp else: return 1 @property def mcomp(self): if self.btn: return self.btn.mcomp else: return 1
[docs] def get_nrow_ncol_nlay_nper(self): if self.btn: return self.btn.nrow, self.btn.ncol, self.btn.nlay, self.btn.nper else: return 0, 0, 0, 0
# Property has no setter, so read-only nrow_ncol_nlay_nper = property(get_nrow_ncol_nlay_nper)
[docs] def write_name_file(self): """ Write the name file. """ fn_path = os.path.join(self.model_ws, self.namefile) f_nam = open(fn_path, "w") f_nam.write("{}\n".format(self.heading)) f_nam.write( "{:14s} {:5d} {}\n".format( self.lst.name[0], self.lst.unit_number[0], self.lst.file_name[0], ) ) if self.ftlfilename is not None: ftlfmt = "" if self.ftlfree: ftlfmt = "FREE" f_nam.write( "{:14s} {:5d} {} {}\n".format( "FTL", self.ftlunit, self.ftlfilename, ftlfmt ) ) # write file entries in name file f_nam.write("{}".format(self.get_name_file_entries())) # write the external files for u, f in zip(self.external_units, self.external_fnames): f_nam.write("DATA {0:5d} ".format(u) + f + "\n") # write the output files for u, f, b in zip( self.output_units, self.output_fnames, self.output_binflag ): if u == 0: continue if b: f_nam.write( "DATA(BINARY) {0:5d} ".format(u) + f + " REPLACE\n" ) else: f_nam.write("DATA {0:5d} ".format(u) + f + "\n") f_nam.close() return
[docs] def load_results(self, **kwargs): return
[docs] @classmethod def load( cls, f, version="mt3dms", exe_name="mt3dms.exe", verbose=False, model_ws=".", load_only=None, forgive=False, modflowmodel=None, ): """ Load an existing model. Parameters ---------- f : string Full path and name of MT3D name file. version : string The version of MT3D (mt3dms, or mt3d-usgs) (default is mt3dms) exe_name : string The name of the executable to use if this loaded model is run. (default is mt3dms.exe) verbose : bool Write information on the load process if True. (default is False) model_ws : string The path for the model workspace. (default is the current working directory '.') load_only : list of strings Filetype(s) to load (e.g. ['btn', 'adv']) (default is None, which means that all will be loaded) forgive : bool, optional Option to raise exceptions on package load failure, which can be useful for debugging. Default False. modflowmodel : flopy.modflow.mf.Modflow This is a flopy Modflow model object upon which this Mt3dms model is based. (the default is None) Returns ------- mt : flopy.mt3d.mt.Mt3dms flopy Mt3d model object Notes ----- The load method does not retain the name for the MODFLOW-generated FTL file. This can be added manually after the MT3D model has been loaded. The syntax for doing this manually is mt.ftlfilename = 'example.ftl' Examples -------- >>> import flopy >>> f = 'example.nam' >>> mt = flopy.mt3d.mt.Mt3dms.load(f) >>> mt.ftlfilename = 'example.ftl' """ modelname, ext = os.path.splitext(f) modelname_extension = ext[1:] # without '.' if verbose: sys.stdout.write( "\nCreating new model with name: {}\n{}\n\n".format( modelname, 50 * "-" ) ) mt = cls( modelname=modelname, namefile_ext=modelname_extension, version=version, exe_name=exe_name, verbose=verbose, model_ws=model_ws, modflowmodel=modflowmodel, ) files_successfully_loaded = [] files_not_loaded = [] # read name file namefile_path = os.path.join(mt.model_ws, f) if not os.path.isfile(namefile_path): raise IOError("cannot find name file: " + str(namefile_path)) try: ext_unit_dict = mfreadnam.parsenamefile( namefile_path, mt.mfnam_packages, verbose=verbose ) except Exception as e: # print("error loading name file entries from file") # print(str(e)) # return None raise Exception( "error loading name file entries from file:\n" + str(e) ) if mt.verbose: print( "\n{}\nExternal unit dictionary:\n{}\n{}\n".format( 50 * "-", ext_unit_dict, 50 * "-" ) ) # reset unit number for list file unitnumber = None for key, value in ext_unit_dict.items(): if value.filetype == "LIST": unitnumber = key filepth = os.path.basename(value.filename) if unitnumber == "LIST": unitnumber = 16 if unitnumber is not None: mt.lst.unit_number = [unitnumber] mt.lst.file_name = [filepth] # set ftl information unitnumber = None for key, value in ext_unit_dict.items(): if value.filetype == "FTL": unitnumber = key filepth = os.path.basename(value.filename) if unitnumber == "FTL": unitnumber = 10 if unitnumber is not None: mt.ftlunit = unitnumber mt.ftlfilename = filepth # load btn btn = None btn_key = None for key, item in ext_unit_dict.items(): if item.filetype.lower() == "btn": btn = item btn_key = key break if btn is None: return None try: pck = btn.package.load( btn.filename, mt, ext_unit_dict=ext_unit_dict ) except Exception as e: raise Exception("error loading BTN: {0}".format(str(e))) files_successfully_loaded.append(btn.filename) if mt.verbose: sys.stdout.write( " {:4s} package load...success\n".format(pck.name[0]) ) ext_unit_dict.pop(btn_key).filehandle.close() ncomp = mt.btn.ncomp # reserved unit numbers for .ucn, s.ucn, .obs, .mas, .cnf poss_output_units = set( list(range(201, 201 + ncomp)) + list(range(301, 301 + ncomp)) + list(range(401, 401 + ncomp)) + list(range(601, 601 + ncomp)) + [17] ) if load_only is None: load_only = [] for key, item in ext_unit_dict.items(): load_only.append(item.filetype) else: if not isinstance(load_only, list): load_only = [load_only] not_found = [] for i, filetype in enumerate(load_only): filetype = filetype.upper() if filetype != "BTN": load_only[i] = filetype found = False for key, item in ext_unit_dict.items(): if item.filetype == filetype: found = True break if not found: not_found.append(filetype) if len(not_found) > 0: raise Exception( "the following load_only entries were not found " "in the ext_unit_dict: " + ",".join(not_found) ) # try loading packages in ext_unit_dict for key, item in ext_unit_dict.items(): if item.package is not None: if item.filetype in load_only: if forgive: try: pck = item.package.load( item.filehandle, mt, ext_unit_dict=ext_unit_dict, ) files_successfully_loaded.append(item.filename) if mt.verbose: sys.stdout.write( " {:4s} package load...success\n".format( pck.name[0] ) ) except BaseException as o: if mt.verbose: sys.stdout.write( " {:4s} package load...failed\n {!s}\n".format( item.filetype, o ) ) files_not_loaded.append(item.filename) else: pck = item.package.load( item.filehandle, mt, ext_unit_dict=ext_unit_dict ) files_successfully_loaded.append(item.filename) if mt.verbose: sys.stdout.write( " {:4s} package load...success\n".format( pck.name[0] ) ) else: if mt.verbose: sys.stdout.write( " {:4s} package load...skipped\n".format( item.filetype ) ) files_not_loaded.append(item.filename) elif "data" not in item.filetype.lower(): files_not_loaded.append(item.filename) if mt.verbose: sys.stdout.write( " {:4s} package load...skipped\n".format( item.filetype ) ) elif "data" in item.filetype.lower(): if mt.verbose: sys.stdout.write( " {} file load...skipped\n {}\n".format( item.filetype, os.path.basename(item.filename) ) ) if key in poss_output_units: # id files specified to output unit numbers and allow to # pass through mt.output_fnames.append(os.path.basename(item.filename)) mt.output_units.append(key) mt.output_binflag.append("binary" in item.filetype.lower()) elif key not in mt.pop_key_list: mt.external_fnames.append(item.filename) mt.external_units.append(key) mt.external_binflag.append( "binary" in item.filetype.lower() ) mt.external_output.append(False) # pop binary output keys and any external file units that are now # internal for key in mt.pop_key_list: try: mt.remove_external(unit=key) item = ext_unit_dict.pop(key) if hasattr(item.filehandle, "close"): item.filehandle.close() except KeyError: if mt.verbose: msg = ( "\nWARNING:\n External file unit " + "{} does not exist in ext_unit_dict.\n".format(key) ) sys.stdout.write(msg) # write message indicating packages that were successfully loaded if mt.verbose: print(1 * "\n") s = " The following {0} packages were successfully loaded.".format( len(files_successfully_loaded) ) print(s) for fname in files_successfully_loaded: print(" " + os.path.basename(fname)) if len(files_not_loaded) > 0: s = " The following {0} packages were not loaded.".format( len(files_not_loaded) ) print(s) for fname in files_not_loaded: print(" " + os.path.basename(fname)) print("\n") # return model object return mt
[docs] @staticmethod def load_mas(fname): """ Load an mt3d mas file and return a numpy recarray Parameters ---------- fname : str name of MT3D mas file Returns ------- r : np.ndarray """ if not os.path.isfile(fname): raise Exception("Could not find file: {}".format(fname)) dtype = [ ("time", float), ("total_in", float), ("total_out", float), ("sources", float), ("sinks", float), ("fluid_storage", float), ("total_mass", float), ("error_in-out", float), ("error_alt", float), ] r = np.loadtxt(fname, skiprows=2, dtype=dtype) r = r.view(np.recarray) return r
[docs] @staticmethod def load_obs(fname): """ Load an mt3d obs file and return a numpy recarray Parameters ---------- fname : str name of MT3D obs file Returns ------- r : np.ndarray """ firstline = "STEP TOTAL TIME LOCATION OF OBSERVATION POINTS (K,I,J)" dtype = [("step", int), ("time", float)] nobs = 0 obs = [] if not os.path.isfile(fname): raise Exception("Could not find file: {}".format(fname)) with open(fname, "r") as f: line = f.readline() if line.strip() != firstline: msg = "First line in file must be \n{}\nFound {}".format( firstline, line.strip() ) msg += ( "\n{} does not appear to be a valid MT3D OBS file".format( fname ) ) raise Exception(msg) # Read obs names (when break, line will have first data line) nlineperrec = 0 while True: line = f.readline() if line[0:7].strip() == "1": break nlineperrec += 1 ll = line.strip().split() while len(ll) > 0: k = int(ll.pop(0)) i = int(ll.pop(0)) j = int(ll.pop(0)) obsnam = "({}, {}, {})".format(k, i, j) if obsnam in obs: obsnam += str(len(obs) + 1) # make obs name unique obs.append(obsnam) icount = 0 r = [] while True: ll = [] for n in range(nlineperrec): icount += 1 if icount > 1: line = f.readline() ll.extend(line.strip().split()) if not line: break rec = [int(ll[0])] for val in ll[1:]: rec.append(float(val)) r.append(tuple(rec)) # add obs names to dtype for nameob in obs: dtype.append((nameob, float)) r = np.array(r, dtype=dtype) r = r.view(np.recarray) return r