Source code for flopy.mt3d.mt

import os

import numpy as np

from ..discretization.modeltime import ModelTime
from ..discretization.structuredgrid import StructuredGrid
from ..mbase import BaseModel
from ..pakbase import Package
from ..utils import mfreadnam
from .mtadv import Mt3dAdv
from .mtbtn import Mt3dBtn
from .mtdsp import Mt3dDsp
from .mtgcg import Mt3dGcg
from .mtlkt import Mt3dLkt
from .mtphc import Mt3dPhc
from .mtrct import Mt3dRct
from .mtsft import Mt3dSft
from .mtssm import Mt3dSsm
from .mttob import Mt3dTob
from .mtuzt import Mt3dUzt


[docs]class Mt3dList(Package): """ List package class """ def __init__(self, model, extension="list", listunit=7): # call base package constructor super().__init__(model, extension, "LIST", listunit) # self.parent.add_package(self) This package is not added to the base # model so that it is not included in get_name_file_entries() return def __repr__(self): return "List package class"
[docs] def write_file(self): # Not implemented for list class return
[docs]class Mt3dms(BaseModel): """ MT3DMS Model Class. Parameters ---------- modelname : str, default "mt3dtest" Name of model. This string will be used to name the MODFLOW input that are created with write_model. namefile_ext : str, default "nam" Extension for the namefile. modflowmodel : flopy.modflow.mf.Modflow This is a flopy Modflow model object upon which this Mt3dms model is based. ftlfilename : str, default "mt3d_link.ftl" Name of flow-transport link file. ftlfree : TYPE, default False If flow-link transport file is formatted (True) or unformatted (False, default). version : str, default "mt3dms" Mt3d version. Choose one of: "mt3dms" (default) or "mt3d-usgs". exe_name : str, default "mt3dms" The name of the executable to use. structured : bool, default True Specify if model grid is structured (default) or unstructured. listunit : int, default 16 Unit number for the list file. ftlunit : int, default 10 Unit number for flow-transport link file. model_ws : str, optional Model workspace. Directory name to create model data sets. Default is the present working directory. external_path : str, optional Location for external files. verbose : bool, default False Print additional information to the screen. load : bool, default True Load model. silent : int, default 0 Silent option. Attributes ---------- Methods ------- See Also -------- Notes ----- Examples -------- >>> import flopy >>> m = flopy.mt3d.mt.Mt3dms() """ def __init__( self, modelname="mt3dtest", namefile_ext="nam", modflowmodel=None, ftlfilename="mt3d_link.ftl", ftlfree=False, version="mt3dms", exe_name="mt3dms", structured=True, listunit=16, ftlunit=10, model_ws=".", external_path=None, verbose=False, load=True, silent=0, ): super().__init__( modelname, namefile_ext, exe_name, model_ws, structured=structured, verbose=verbose, ) # Set attributes self.version_types = {"mt3dms": "MT3DMS", "mt3d-usgs": "MT3D-USGS"} self.set_version(version.lower()) if listunit is None: listunit = 16 if ftlunit is None: ftlunit = 10 self.lst = Mt3dList(self, listunit=listunit) self.mf = modflowmodel self.ftlfilename = ftlfilename self.ftlfree = ftlfree self.ftlunit = ftlunit self.free_format = None # Check whether specified ftlfile exists in model directory; if not, # warn user if os.path.isfile( os.path.join(self.model_ws, f"{modelname}.{namefile_ext}") ): with open( os.path.join(self.model_ws, f"{modelname}.{namefile_ext}") ) as nm_file: for line in nm_file: if line[0:3] == "FTL": ftlfilename = line.strip().split()[2] break if ftlfilename is None: print("User specified FTL file does not exist in model directory") print("MT3D will not work without a linker file") else: if os.path.isfile(os.path.join(self.model_ws, ftlfilename)): # Check that the FTL present in the directory is of the format # specified by the user, i.e., is same as ftlfree # Do this by checking whether the first non-blank character is # an apostrophe. # If code lands here, then ftlfilename exists, open and read # first 4 characters f = open(os.path.join(self.model_ws, ftlfilename), "rb") c = f.read(4) if isinstance(c, bytes): c = c.decode() # if first non-blank char is an apostrophe, then formatted, # otherwise binary if (c.strip()[0] == "'" and self.ftlfree) or ( c.strip()[0] != "'" and not self.ftlfree ): pass else: print( "Specified value of ftlfree conflicts with FTL " "file format" ) print( f"Switching ftlfree from {self.ftlfree} to {not self.ftlfree}" ) self.ftlfree = not self.ftlfree # Flip the bool # external option stuff self.array_free_format = False self.array_format = "mt3d" self.external_fnames = [] self.external_units = [] self.external_binflag = [] self.external = False self.load = load # the starting external data unit number self._next_ext_unit = 2000 if external_path is not None: # assert model_ws == '.', "ERROR: external cannot be used " + \ # "with model_ws" # external_path = os.path.join(model_ws, external_path) if os.path.exists(external_path): print(f"Note: external_path {external_path} already exists") # assert os.path.exists(external_path),'external_path does not exist' else: os.mkdir(external_path) self.external = True self.external_path = external_path self.verbose = verbose self.silent = silent # Create a dictionary to map package with package object. # This is used for loading models. self.mfnam_packages = { "btn": Mt3dBtn, "adv": Mt3dAdv, "dsp": Mt3dDsp, "ssm": Mt3dSsm, "rct": Mt3dRct, "gcg": Mt3dGcg, "tob": Mt3dTob, "phc": Mt3dPhc, "lkt": Mt3dLkt, "sft": Mt3dSft, "uzt2": Mt3dUzt, } return def __repr__(self): return "MT3DMS model" @property def modeltime(self): # build model time data_frame = { "perlen": self.mf.dis.perlen.array, "nstp": self.mf.dis.nstp.array, "tsmult": self.mf.dis.tsmult.array, } self._model_time = ModelTime( data_frame, self.mf.dis.itmuni_dict[self.mf.dis.itmuni], self.dis.start_datetime, self.dis.steady.array, ) return self._model_time @property def modelgrid(self): if not self._mg_resync: return self._modelgrid if self.btn is not None: ibound = self.btn.icbund.array delc = self.btn.delc.array delr = self.btn.delr.array top = self.btn.htop.array botm = np.subtract(top, self.btn.dz.array.cumsum(axis=0)) nlay = self.btn.nlay else: delc = self.mf.dis.delc.array delr = self.mf.dis.delr.array top = self.mf.dis.top.array botm = self.mf.dis.botm.array nlay = self.mf.nlay if self.mf.bas6 is not None: ibound = self.mf.bas6.ibound.array else: ibound = None # build grid self._modelgrid = StructuredGrid( delc=delc, delr=delr, top=top, botm=botm, idomain=ibound, crs=self._modelgrid.crs, xoff=self._modelgrid.xoffset, yoff=self._modelgrid.yoffset, angrot=self._modelgrid.angrot, nlay=nlay, ) # resolve offsets xoff = self._modelgrid.xoffset if xoff is None: if self._xul is not None: xoff = self._modelgrid._xul_to_xll(self._xul) else: xoff = self.mf._modelgrid.xoffset if xoff is None: # in case mf._modelgrid.xoffset is not set but mf._xul is if self.mf._xul is not None: xoff = self._modelgrid._xul_to_xll(self.mf._xul) else: xoff = 0.0 yoff = self._modelgrid.yoffset if yoff is None: if self._yul is not None: yoff = self._modelgrid._yul_to_yll(self._yul) else: yoff = self.mf._modelgrid.yoffset if yoff is None: # in case mf._modelgrid.yoffset is not set but mf._yul is if self.mf._yul is not None: yoff = self._modelgrid._yul_to_yll(self.mf._yul) else: yoff = 0.0 crs = self._modelgrid.crs if crs is None: crs = self.mf._modelgrid.crs angrot = self._modelgrid.angrot if angrot is None or angrot == 0.0: # angrot normally defaulted to 0.0 if self.mf._modelgrid.angrot is not None: angrot = self.mf._modelgrid.angrot else: angrot = 0.0 self._modelgrid.set_coord_info(xoff, yoff, angrot, crs=crs) self._mg_resync = not self._modelgrid.is_complete return self._modelgrid @property def solver_tols(self): if self.gcg is not None: return self.gcg.cclose, -999 return None @property def nlay(self): if self.btn: return self.btn.nlay else: return 0 @property def nrow(self): if self.btn: return self.btn.nrow else: return 0 @property def ncol(self): if self.btn: return self.btn.ncol else: return 0 @property def nper(self): if self.btn: return self.btn.nper else: return 0 @property def ncomp(self): if self.btn: return self.btn.ncomp else: return 1 @property def mcomp(self): if self.btn: return self.btn.mcomp else: return 1
[docs] def get_nrow_ncol_nlay_nper(self): if self.btn: return self.btn.nrow, self.btn.ncol, self.btn.nlay, self.btn.nper else: return 0, 0, 0, 0
# Property has no setter, so read-only nrow_ncol_nlay_nper = property(get_nrow_ncol_nlay_nper)
[docs] def write_name_file(self): """ Write the name file. """ fn_path = os.path.join(self.model_ws, self.namefile) f_nam = open(fn_path, "w") f_nam.write(f"{self.heading}\n") f_nam.write( "{:14s} {:5d} {}\n".format( self.lst.name[0], self.lst.unit_number[0], self.lst.file_name[0], ) ) if self.ftlfilename is not None: ftlfmt = "" if self.ftlfree: ftlfmt = "FREE" f_nam.write( f"{'FTL':14s} {self.ftlunit:5d} {self.ftlfilename} {ftlfmt}\n" ) # write file entries in name file f_nam.write(str(self.get_name_file_entries())) # write the external files for u, f in zip(self.external_units, self.external_fnames): f_nam.write(f"DATA {u:5d} {f}\n") # write the output files for u, f, b in zip( self.output_units, self.output_fnames, self.output_binflag ): if u == 0: continue if b: f_nam.write(f"DATA(BINARY) {u:5d} {f} REPLACE\n") else: f_nam.write(f"DATA {u:5d} {f}\n") f_nam.close() return
[docs] def load_results(self, **kwargs): return
[docs] @classmethod def load( cls, f, version="mt3dms", exe_name="mt3dms", verbose=False, model_ws=".", load_only=None, forgive=False, modflowmodel=None, ): """ Load an existing model. Parameters ---------- f : str Path to MT3D name file to load. version : str, default "mt3dms" Mt3d version. Choose one of: "mt3dms" (default) or "mt3d-usgs". exe_name : str, default "mt3dms" The name of the executable to use. verbose : bool, default False Print information on the load process if True. model_ws : str, default "." Model workspace path. Default is the current directory. load_only : list of str, optional Packages to load (e.g. ['btn', 'adv']). Default None means that all packages will be loaded. forgive : bool, default False Option to raise exceptions on package load failure, which can be useful for debugging. modflowmodel : flopy.modflow.mf.Modflow, optional This is a flopy Modflow model object upon which this Mt3dms model is based. Returns ------- flopy.mt3d.mt.Mt3dms Notes ----- The load method does not retain the name for the MODFLOW-generated FTL file. This can be added manually after the MT3D model has been loaded. The syntax for doing this manually is ``mt.ftlfilename = 'example.ftl'``. Examples -------- >>> import flopy >>> mt = flopy.mt3d.mt.Mt3dms.load('example.nam') >>> mt.ftlfilename = 'example.ftl' """ modelname, ext = os.path.splitext(f) modelname_extension = ext[1:] # without '.' if verbose: print(f"\nCreating new model with name: {modelname}\n{50 * '-'}\n") mt = cls( modelname=modelname, namefile_ext=modelname_extension, version=version, exe_name=exe_name, verbose=verbose, model_ws=model_ws, modflowmodel=modflowmodel, ) files_successfully_loaded = [] files_not_loaded = [] # read name file namefile_path = os.path.join(mt.model_ws, f) if not os.path.isfile(namefile_path): raise FileNotFoundError(f"cannot find name file: {namefile_path}") try: ext_unit_dict = mfreadnam.parsenamefile( namefile_path, mt.mfnam_packages, verbose=verbose ) except Exception as e: # print("error loading name file entries from file") # print(str(e)) # return None raise Exception( f"error loading name file entries from file:\n{e!s}" ) if mt.verbose: print( "\n{}\nExternal unit dictionary:\n{}\n{}\n".format( 50 * "-", ext_unit_dict, 50 * "-" ) ) # reset unit number for list file unitnumber = None for key, value in ext_unit_dict.items(): if value.filetype == "LIST": unitnumber = key filepth = os.path.basename(value.filename) if unitnumber == "LIST": unitnumber = 16 if unitnumber is not None: mt.lst.unit_number = [unitnumber] mt.lst.file_name = [filepth] # set ftl information unitnumber = None for key, value in ext_unit_dict.items(): if value.filetype == "FTL": unitnumber = key filepth = os.path.basename(value.filename) if unitnumber == "FTL": unitnumber = 10 if unitnumber is not None: mt.ftlunit = unitnumber mt.ftlfilename = filepth # load btn btn = None btn_key = None for key, item in ext_unit_dict.items(): if item.filetype.lower() == "btn": btn = item btn_key = key break if btn is None: return None try: pck = btn.package.load( btn.filename, mt, ext_unit_dict=ext_unit_dict ) except Exception as e: raise Exception(f"error loading BTN: {e!s}") files_successfully_loaded.append(btn.filename) if mt.verbose: print(f" {pck.name[0]:4s} package load...success") ext_unit_dict.pop(btn_key).filehandle.close() ncomp = mt.btn.ncomp # reserved unit numbers for .ucn, s.ucn, .obs, .mas, .cnf poss_output_units = set( list(range(201, 201 + ncomp)) + list(range(301, 301 + ncomp)) + list(range(401, 401 + ncomp)) + list(range(601, 601 + ncomp)) + [17] ) if load_only is None: load_only = [] for key, item in ext_unit_dict.items(): load_only.append(item.filetype) else: if not isinstance(load_only, list): load_only = [load_only] not_found = [] for i, filetype in enumerate(load_only): filetype = filetype.upper() if filetype != "BTN": load_only[i] = filetype found = False for key, item in ext_unit_dict.items(): if item.filetype == filetype: found = True break if not found: not_found.append(filetype) if len(not_found) > 0: raise Exception( "the following load_only entries were not found " "in the ext_unit_dict: " + ",".join(not_found) ) # try loading packages in ext_unit_dict for key, item in ext_unit_dict.items(): if item.package is not None: if item.filetype in load_only: if forgive: try: pck = item.package.load( item.filehandle, mt, ext_unit_dict=ext_unit_dict, ) files_successfully_loaded.append(item.filename) if mt.verbose: print( f" {pck.name[0]:4s} package load...success" ) except BaseException as o: if mt.verbose: print( f" {item.filetype:4s} package load" f"...failed\n {o!s}" ) files_not_loaded.append(item.filename) else: pck = item.package.load( item.filehandle, mt, ext_unit_dict=ext_unit_dict ) files_successfully_loaded.append(item.filename) if mt.verbose: print( f" {pck.name[0]:4s} package load...success" ) else: if mt.verbose: print(f" {item.filetype:4s} package load...skipped") files_not_loaded.append(item.filename) elif "data" not in item.filetype.lower(): files_not_loaded.append(item.filename) if mt.verbose: print(f" {item.filetype:4s} package load...skipped") elif "data" in item.filetype.lower(): if mt.verbose: print( " {} file load...skipped\n {}".format( item.filetype, os.path.basename(item.filename) ) ) if key in poss_output_units: # id files specified to output unit numbers and allow to # pass through mt.output_fnames.append(os.path.basename(item.filename)) mt.output_units.append(key) mt.output_binflag.append("binary" in item.filetype.lower()) elif key not in mt.pop_key_list: mt.external_fnames.append(item.filename) mt.external_units.append(key) mt.external_binflag.append( "binary" in item.filetype.lower() ) mt.external_output.append(False) # pop binary output keys and any external file units that are now # internal for key in mt.pop_key_list: try: mt.remove_external(unit=key) item = ext_unit_dict.pop(key) if hasattr(item.filehandle, "close"): item.filehandle.close() except KeyError: if mt.verbose: print( "\nWARNING:\n External file unit " f"{key} does not exist in ext_unit_dict." ) # write message indicating packages that were successfully loaded if mt.verbose: print( "\n The following {} packages were " "successfully loaded.".format(len(files_successfully_loaded)) ) for fname in files_successfully_loaded: print(f" {os.path.basename(fname)}") if len(files_not_loaded) > 0: print( f" The following {len(files_not_loaded)} packages " "were not loaded." ) for fname in files_not_loaded: print(f" {os.path.basename(fname)}") print("\n") # return model object return mt
[docs] @staticmethod def load_mas(fname): """ Load an mt3d mas file and return a numpy recarray Parameters ---------- fname : str name of MT3D mas file Returns ------- r : np.ndarray """ if not os.path.isfile(fname): raise Exception(f"Could not find file: {fname}") dtype = [ ("time", float), ("total_in", float), ("total_out", float), ("sources", float), ("sinks", float), ("fluid_storage", float), ("total_mass", float), ("error_in-out", float), ("error_alt", float), ] r = np.loadtxt(fname, skiprows=2, dtype=dtype) r = r.view(np.recarray) return r
[docs] @staticmethod def load_obs(fname): """ Load an mt3d obs file and return a numpy recarray Parameters ---------- fname : str name of MT3D obs file Returns ------- r : np.ndarray """ firstline = "STEP TOTAL TIME LOCATION OF OBSERVATION POINTS (K,I,J)" dtype = [("step", int), ("time", float)] nobs = 0 obs = [] if not os.path.isfile(fname): raise Exception(f"Could not find file: {fname}") with open(fname) as f: line = f.readline() if line.strip() != firstline: raise Exception( "First line in file must be \n{}\nFound {}\n" "{} does not appear to be a valid MT3D OBS file".format( firstline, line.strip(), fname ) ) # Read obs names (when break, line will have first data line) nlineperrec = 0 while True: line = f.readline() if line[0:7].strip() == "1": break nlineperrec += 1 ll = line.strip().split() while len(ll) > 0: k = int(ll.pop(0)) i = int(ll.pop(0)) j = int(ll.pop(0)) obsnam = f"({k}, {i}, {j})" if obsnam in obs: obsnam += str(len(obs) + 1) # make obs name unique obs.append(obsnam) icount = 0 r = [] while True: ll = [] for n in range(nlineperrec): icount += 1 if icount > 1: line = f.readline() ll.extend(line.strip().split()) if not line: break rec = [int(ll[0])] for val in ll[1:]: rec.append(float(val)) r.append(tuple(rec)) # add obs names to dtype for nameob in obs: dtype.append((nameob, float)) r = np.array(r, dtype=dtype) r = r.view(np.recarray) return r