import errno
import inspect
import os.path
import sys
import numpy as np
from ...mbase import run_model
from ..data import mfstructure
from ..data.mfdatautil import MFComment
from ..data.mfstructure import DatumType
from ..mfbase import (
ExtFileAction,
FlopyException,
MFDataException,
MFFileMgmt,
PackageContainer,
PackageContainerType,
VerbosityLevel,
)
from ..mfpackage import MFPackage
from ..modflow import mfgwfgnc, mfgwfmvr, mfgwtmvt, mfims, mfnam, mftdis
from ..utils import binaryfile_utils, mfobservation
[docs]class SimulationDict(dict):
"""
Class containing custom dictionary for MODFLOW simulations. Dictionary
contains model data. Dictionary keys are "paths" to the data that include
the model and package containing the data.
Behaves as an dict with some additional features described below.
Parameters
----------
path : MFFileMgmt
Object containing path information for the simulation
"""
def __init__(self, path=None):
dict.__init__(self)
self._path = path
def __getitem__(self, key):
"""
Define the __getitem__ magic method.
Parameters
----------
key (string): Part or all of a dictionary key
Returns:
MFData or numpy.ndarray
"""
if key == "_path" or not hasattr(self, "_path"):
raise AttributeError(key)
# FIX: Transport - Include transport output files
if key[1] in ("CBC", "HDS", "DDN", "UCN"):
val = binaryfile_utils.MFOutput(self, self._path, key)
return val.data
elif key[-1] == "Observations":
val = mfobservation.MFObservation(self, self._path, key)
return val.data
if key in self:
val = dict.__getitem__(self, key)
return val
return AttributeError(key)
def __setitem__(self, key, val):
"""
Define the __setitem__ magic method.
Parameters
----------
key : str
Dictionary key
val : MFData
MFData to store in dictionary
"""
dict.__setitem__(self, key, val)
[docs] def find_in_path(self, key_path, key_leaf):
"""
Attempt to find key_leaf in a partial key path key_path.
Parameters
----------
key_path : str
partial path to the data
key_leaf : str
name of the data
Returns
-------
Data: MFData,
index: int
"""
key_path_size = len(key_path)
for key, item in self.items():
if key[:key_path_size] == key_path:
if key[-1] == key_leaf:
# found key_leaf as a key in the dictionary
return item, None
if not isinstance(item, MFComment):
data_item_index = 0
data_item_structures = item.structure.data_item_structures
for data_item_struct in data_item_structures:
if data_item_struct.name == key_leaf:
# found key_leaf as a data item name in the data
# in the dictionary
return item, data_item_index
if data_item_struct.type != DatumType.keyword:
data_item_index += 1
return None, None
[docs] def output_keys(self, print_keys=True):
"""
Return a list of output data keys supported by the dictionary.
Parameters
----------
print_keys : bool
print keys to console
Returns
-------
output keys : list
"""
# get keys to request binary output
x = binaryfile_utils.MFOutputRequester.getkeys(
self, self._path, print_keys=print_keys
)
return [key for key in x.dataDict]
[docs] def observation_keys(self):
"""
Return a list of observation keys.
Returns
-------
observation keys : list
"""
# get keys to request observation file output
mfobservation.MFObservationRequester.getkeys(self, self._path)
[docs] def keys(self):
"""
Return a list of all keys.
Returns
-------
all keys : list
"""
# overrides the built in keys to print all keys, input and output
self.input_keys()
try:
self.output_keys()
except OSError as e:
if e.errno == errno.EEXIST:
pass
try:
self.observation_keys()
except KeyError:
pass
[docs]class MFSimulationData:
"""
Class containing MODFLOW simulation data and file formatting data. Use
MFSimulationData to set simulation-wide settings which include data
formatting and file location settings.
Parameters
----------
path : str
path on disk to the simulation
Attributes
----------
indent_string : str
String used to define how much indent to use (file formatting)
internal_formatting : list
List defining string to use for internal formatting
external_formatting : list
List defining string to use for external formatting
open_close_formatting : list
List defining string to use for open/close
max_columns_of_data : int
Maximum columns of data before line wraps. For structured grids this
is set to ncol by default. For all other grids the default is 20.
wrap_multidim_arrays : bool
Whether to wrap line for multi-dimensional arrays at the end of a
row/column/layer
float_precision : int
Number of decimal points to write for a floating point number
float_characters : int
Number of characters a floating point number takes up
write_headers: bool
When true flopy writes a header to each package file indicating that
it was created by flopy
sci_note_upper_thres : float
Numbers greater than this threshold are written in scientific notation
sci_note_lower_thres : float
Numbers less than this threshold are written in scientific notation
mfpath : MFFileMgmt
File path location information for the simulation
model_dimensions : dict
Dictionary containing discretization information for each model
mfdata : SimulationDict
Custom dictionary containing all model data for the simulation
"""
def __init__(self, path, mfsim):
# --- formatting variables ---
self.indent_string = " "
self.constant_formatting = ["constant", ""]
self._max_columns_of_data = 20
self.wrap_multidim_arrays = True
self.float_precision = 8
self.float_characters = 15
self.write_headers = True
self._sci_note_upper_thres = 100000
self._sci_note_lower_thres = 0.001
self.fast_write = True
self.comments_on = False
self.auto_set_sizes = True
self.verify_data = True
self.debug = False
self.verbose = True
self.verbosity_level = VerbosityLevel.normal
self.max_columns_user_set = False
self.max_columns_auto_set = False
self._update_str_format()
# --- file path ---
self.mfpath = MFFileMgmt(path, mfsim)
# --- ease of use variables to make working with modflow input and
# output data easier --- model dimension class for each model
self.model_dimensions = {}
# --- model data ---
self.mfdata = SimulationDict(self.mfpath)
# --- temporary variables ---
# other external files referenced
self.referenced_files = {}
@property
def max_columns_of_data(self):
return self._max_columns_of_data
@max_columns_of_data.setter
def max_columns_of_data(self, val):
if not self.max_columns_user_set and (
not self.max_columns_auto_set or val > self._max_columns_of_data
):
self._max_columns_of_data = val
self.max_columns_user_set = True
[docs] def set_sci_note_upper_thres(self, value):
"""
Sets threshold number where any number larger than threshold
is represented in scientific notation.
Parameters
----------
value: float
threshold value
"""
self._sci_note_upper_thres = value
self._update_str_format()
[docs] def set_sci_note_lower_thres(self, value):
"""
Sets threshold number where any number smaller than threshold
is represented in scientific notation.
Parameters
----------
value: float
threshold value
"""
self._sci_note_lower_thres = value
self._update_str_format()
def _update_str_format(self):
"""
Update floating point formatting strings."""
self.reg_format_str = f"{{:.{self.float_precision}E}}"
self.sci_format_str = (
f"{{:{self.float_characters}.{self.float_precision}f}}"
)
[docs]class MFSimulation(PackageContainer):
"""
Entry point into any MODFLOW simulation.
MFSimulation is used to load, build, and/or save a MODFLOW 6 simulation.
A MFSimulation object must be created before creating any of the MODFLOW 6
model objects.
Parameters
----------
sim_name : str
Name of the simulation.
version : str
Version of MODFLOW 6 executable
exe_name : str
Relative path to MODFLOW 6 executable from the simulation
working folder.
sim_ws : str
Path to MODFLOW 6 simulation working folder. This is the folder
containing the simulation name file.
verbosity_level : int
Verbosity level of standard output from 0 to 2. When 0 is specified no
standard output is written. When 1 is specified standard
error/warning messages with some informational messages are written.
When 2 is specified full error/warning/informational messages are
written (this is ideal for debugging).
continue_ : bool
Sets the continue option in the simulation name file. The continue
option is a keyword flag to indicate that the simulation should
continue even if one or more solutions do not converge.
nocheck : bool
Sets the nocheck option in the simulation name file. The nocheck
option is a keyword flag to indicate that the model input check
routines should not be called prior to each time step. Checks
are performed by default.
memory_print_option : str
Sets memory_print_option in the simulation name file.
Memory_print_option is a flag that controls printing of detailed
memory manager usage to the end of the simulation list file. NONE
means do not print detailed information. SUMMARY means print only
the total memory for each simulation component. ALL means print
information for each variable stored in the memory manager. NONE is
default if memory_print_option is not specified.
write_headers: bool
When true flopy writes a header to each package file indicating that
it was created by flopy.
Examples
--------
>>> s = MFSimulation.load('my simulation', 'simulation.nam')
Attributes
----------
sim_name : str
Name of the simulation
name_file : MFPackage
Simulation name file package
"""
def __init__(
self,
sim_name="sim",
version="mf6",
exe_name="mf6.exe",
sim_ws=".",
verbosity_level=1,
continue_=None,
nocheck=None,
memory_print_option=None,
write_headers=True,
):
super().__init__(MFSimulationData(sim_ws, self), sim_name)
self.simulation_data.verbosity_level = self._resolve_verbosity_level(
verbosity_level
)
self.simulation_data.write_headers = write_headers
# verify metadata
fpdata = mfstructure.MFStructure()
if not fpdata.valid:
excpt_str = (
"Invalid package metadata. Unable to load MODFLOW "
"file structure metadata."
)
raise FlopyException(excpt_str)
# initialize
self.dimensions = None
self.type = "Simulation"
self.version = version
self.exe_name = exe_name
self._models = {}
self._tdis_file = None
self._exchange_files = {}
self._ims_files = {}
self._ghost_node_files = {}
self._mover_files = {}
self._mvt_files = {}
self._other_files = {}
self.structure = fpdata.sim_struct
self.model_type = None
self._exg_file_num = {}
self._gnc_file_num = 0
self._mvr_file_num = 0
self._mvt_file_num = 0
self.simulation_data.mfpath.set_last_accessed_path()
# build simulation name file
self.name_file = mfnam.ModflowNam(
self,
filename="mfsim.nam",
continue_=continue_,
nocheck=nocheck,
memory_print_option=memory_print_option,
)
# try to build directory structure
sim_path = self.simulation_data.mfpath.get_sim_path()
if not os.path.isdir(sim_path):
try:
os.makedirs(sim_path)
except OSError as e:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.quiet.value
):
print(
"An error occurred when trying to create the "
"directory {}: {}".format(sim_path, e.strerror)
)
# set simulation validity initially to false since the user must first
# add at least one model to the simulation and fill out the name and
# tdis files
self.valid = False
def __getattr__(self, item):
"""
Override __getattr__ to allow retrieving models.
__getattr__ is used to allow for getting models and packages as if
they are attributes
Parameters
----------
item : str
model or package name
Returns
-------
md : Model or package object
Model or package object of type :class:flopy6.mfmodel or
:class:flopy6.mfpackage
"""
if item == "valid" or not hasattr(self, "valid"):
raise AttributeError(item)
models = []
if item in self.structure.model_types:
# get all models of this type
for model in self._models.values():
if model.model_type == item:
models.append(model)
if len(models) > 0:
return models
elif item in self._models:
model = self.get_model(item)
if model is not None:
return model
raise AttributeError(item)
else:
package = self.get_package(item)
if package is not None:
return package
raise AttributeError(item)
def __repr__(self):
"""
Override __repr__ to print custom string.
Returns
--------
repr string : str
string describing object
"""
return self._get_data_str(True)
def __str__(self):
"""
Override __str__ to print custom string.
Returns
--------
str string : str
string describing object
"""
return self._get_data_str(False)
def _get_data_str(self, formal):
file_mgt = self.simulation_data.mfpath
data_str = (
"sim_name = {}\nsim_path = {}\nexe_name = "
"{}\n"
"\n".format(self.name, file_mgt.get_sim_path(), self.exe_name)
)
for package in self._packagelist:
pk_str = package._get_data_str(formal, False)
if formal:
if len(pk_str.strip()) > 0:
data_str = (
"{}###################\nPackage {}\n"
"###################\n\n"
"{}\n".format(data_str, package._get_pname(), pk_str)
)
else:
if len(pk_str.strip()) > 0:
data_str = (
"{}###################\nPackage {}\n"
"###################\n\n"
"{}\n".format(data_str, package._get_pname(), pk_str)
)
for model in self._models.values():
if formal:
mod_repr = repr(model)
if len(mod_repr.strip()) > 0:
data_str = (
"{}@@@@@@@@@@@@@@@@@@@@\nModel {}\n"
"@@@@@@@@@@@@@@@@@@@@\n\n"
"{}\n".format(data_str, model.name, mod_repr)
)
else:
mod_str = str(model)
if len(mod_str.strip()) > 0:
data_str = (
"{}@@@@@@@@@@@@@@@@@@@@\nModel {}\n"
"@@@@@@@@@@@@@@@@@@@@\n\n"
"{}\n".format(data_str, model.name, mod_str)
)
return data_str
@property
def model_names(self):
"""
Return a list of model names associated with this simulation.
Returns
--------
list: list of model names
"""
return self._models.keys()
@property
def exchange_files(self):
"""
Return list of exchange files associated with this simulation.
Returns
--------
list: list of exchange names
"""
return self._exchange_files.values()
[docs] @classmethod
def load(
cls,
sim_name="modflowsim",
version="mf6",
exe_name="mf6.exe",
sim_ws=".",
strict=True,
verbosity_level=1,
load_only=None,
verify_data=False,
write_headers=True,
):
"""
Load an existing model.
Parameters
----------
sim_name : str
Name of the simulation.
version : str
MODFLOW version
exe_name : str
Relative path to MODFLOW executable from the simulation working
folder
sim_ws : str
Path to simulation working folder
strict : bool
Strict enforcement of file formatting
verbosity_level : int
Verbosity level of standard output
0: No standard output
1: Standard error/warning messages with some informational
messages
2: Verbose mode with full error/warning/informational
messages. This is ideal for debugging.
load_only : list
List of package abbreviations or package names corresponding to
packages that flopy will load. default is None, which loads all
packages. the discretization packages will load regardless of this
setting. subpackages, like time series and observations, will also
load regardless of this setting.
example list: ['ic', 'maw', 'npf', 'oc', 'ims', 'gwf6-gwf6']
verify_data : bool
Verify data when it is loaded. this can slow down loading
write_headers: bool
When true flopy writes a header to each package file indicating
that it was created by flopy
Returns
-------
sim : MFSimulation object
Examples
--------
>>> s = flopy.mf6.mfsimulation.load('my simulation')
"""
# initialize
instance = cls(
sim_name,
version,
exe_name,
sim_ws,
verbosity_level,
write_headers=write_headers,
)
verbosity_level = instance.simulation_data.verbosity_level
instance.simulation_data.verify_data = verify_data
if verbosity_level.value >= VerbosityLevel.normal.value:
print("loading simulation...")
# build case consistent load_only dictionary for quick lookups
load_only = instance._load_only_dict(load_only)
# load simulation name file
if verbosity_level.value >= VerbosityLevel.normal.value:
print(" loading simulation name file...")
instance.name_file.load(strict)
# load TDIS file
tdis_pkg = f"tdis{mfstructure.MFStructure().get_version_string()}"
tdis_attr = getattr(instance.name_file, tdis_pkg)
instance._tdis_file = mftdis.ModflowTdis(
instance, filename=tdis_attr.get_data()
)
instance._tdis_file._filename = instance.simulation_data.mfdata[
("nam", "timing", tdis_pkg)
].get_data()
if verbosity_level.value >= VerbosityLevel.normal.value:
print(" loading tdis package...")
instance._tdis_file.load(strict)
# load models
try:
model_recarray = instance.simulation_data.mfdata[
("nam", "models", "models")
]
models = model_recarray.get_data()
except MFDataException as mfde:
message = (
"Error occurred while loading model names from the "
"simulation name file."
)
raise MFDataException(
mfdata_except=mfde,
model=instance.name,
package="nam",
message=message,
)
for item in models:
# resolve model working folder and name file
path, name_file = os.path.split(item[1])
model_obj = PackageContainer.model_factory(item[0][:-1].lower())
# load model
if verbosity_level.value >= VerbosityLevel.normal.value:
print(f" loading model {item[0].lower()}...")
instance._models[item[2]] = model_obj.load(
instance,
instance.structure.model_struct_objs[item[0].lower()],
item[2],
name_file,
version,
exe_name,
strict,
path,
load_only,
)
# load exchange packages and dependent packages
try:
exchange_recarray = instance.name_file.exchanges
has_exch_data = exchange_recarray.has_data()
except MFDataException as mfde:
message = (
"Error occurred while loading exchange names from the "
"simulation name file."
)
raise MFDataException(
mfdata_except=mfde,
model=instance.name,
package="nam",
message=message,
)
if has_exch_data:
try:
exch_data = exchange_recarray.get_data()
except MFDataException as mfde:
message = (
"Error occurred while loading exchange names from "
"the simulation name file."
)
raise MFDataException(
mfdata_except=mfde,
model=instance.name,
package="nam",
message=message,
)
for exgfile in exch_data:
if load_only is not None and not instance._in_pkg_list(
load_only, exgfile[0], exgfile[2]
):
if (
instance.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(f" skipping package {exgfile[0].lower()}...")
continue
# get exchange type by removing numbers from exgtype
exchange_type = "".join(
[char for char in exgfile[0] if not char.isdigit()]
).upper()
# get exchange number for this type
if exchange_type not in instance._exg_file_num:
exchange_file_num = 0
instance._exg_file_num[exchange_type] = 1
else:
exchange_file_num = instance._exg_file_num[exchange_type]
instance._exg_file_num[exchange_type] += 1
exchange_name = f"{exchange_type}_EXG_{exchange_file_num}"
# find package class the corresponds to this exchange type
package_obj = instance.package_factory(
exchange_type.replace("-", "").lower(), ""
)
if not package_obj:
message = (
"An error occurred while loading the "
"simulation name file. Invalid exchange type "
'"{}" specified.'.format(exchange_type)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
instance.name,
"nam",
"nam",
"loading simulation name file",
exchange_recarray.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
instance._simulation_data.debug,
)
# build and load exchange package object
exchange_file = package_obj(
instance,
exgtype=exgfile[0],
exgmnamea=exgfile[2],
exgmnameb=exgfile[3],
filename=exgfile[1],
pname=exchange_name,
loading_package=True,
)
if verbosity_level.value >= VerbosityLevel.normal.value:
print(
f" loading exchange package {exchange_file._get_pname()}..."
)
exchange_file.load(strict)
instance._exchange_files[exgfile[1]] = exchange_file
# load simulation packages
solution_recarray = instance.simulation_data.mfdata[
("nam", "solutiongroup", "solutiongroup")
]
try:
solution_group_dict = solution_recarray.get_data()
except MFDataException as mfde:
message = (
"Error occurred while loading solution groups from "
"the simulation name file."
)
raise MFDataException(
mfdata_except=mfde,
model=instance.name,
package="nam",
message=message,
)
for solution_group in solution_group_dict.values():
for solution_info in solution_group:
if load_only is not None and not instance._in_pkg_list(
load_only, solution_info[0], solution_info[2]
):
if (
instance.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
f" skipping package {solution_info[0].lower()}..."
)
continue
ims_file = mfims.ModflowIms(
instance, filename=solution_info[1], pname=solution_info[2]
)
if verbosity_level.value >= VerbosityLevel.normal.value:
print(f" loading ims package {ims_file._get_pname()}...")
ims_file.load(strict)
instance.simulation_data.mfpath.set_last_accessed_path()
if verify_data:
instance.check()
return instance
[docs] def check(self, f=None, verbose=True, level=1):
"""
Check model data for common errors.
Parameters
----------
f : str or file handle
String defining file name or file handle for summary file
of check method output. If a string is passed a file handle
is created. If f is None, check method does not write
results to a summary file. (default is None)
verbose : bool
Boolean flag used to determine if check method results are
written to the screen
level : int
Check method analysis level. If level=0, summary checks are
performed. If level=1, full checks are performed.
Returns
-------
check list: list
Python list containing simulation check results
Examples
--------
>>> import flopy
>>> m = flopy.modflow.Modflow.load('model.nam')
>>> m.check()
"""
# check instance for simulation-level check
chk_list = []
# check models
for model in self._models.values():
print(f'Checking model "{model.name}"...')
chk_list.append(model.check(f, verbose, level))
print("Checking for missing simulation packages...")
if self._tdis_file is None:
if chk_list:
chk_list[0]._add_to_summary(
"Error", desc="\r No tdis package", package="model"
)
print("Error: no tdis package")
if len(self._ims_files) == 0:
if chk_list:
chk_list[0]._add_to_summary(
"Error", desc="\r No solver package", package="model"
)
print("Error: no ims package")
return chk_list
@property
def sim_package_list(self):
"""List of all "simulation level" packages"""
package_list = []
if self._tdis_file is not None:
package_list.append(self._tdis_file)
for sim_package in self._ims_files.values():
package_list.append(sim_package)
for sim_package in self._exchange_files.values():
package_list.append(sim_package)
for sim_package in self._mover_files.values():
package_list.append(sim_package)
for sim_package in self._mvt_files.values():
package_list.append(sim_package)
for sim_package in self._other_files.values():
package_list.append(sim_package)
return package_list
[docs] def load_package(
self,
ftype,
fname,
pname,
strict,
ref_path,
dict_package_name=None,
parent_package=None,
):
"""
Load a package from a file.
Parameters
----------
ftype : str
the file type
fname : str
the name of the file containing the package input
pname : str
the user-defined name for the package
strict : bool
strict mode when loading the file
ref_path : str
path to the file. uses local path if set to None
dict_package_name : str
package name for dictionary lookup
parent_package : MFPackage
parent package
"""
if ftype == "gnc":
if fname not in self._ghost_node_files:
# get package type from parent package
if parent_package:
package_abbr = parent_package.package_abbr[0:3]
else:
package_abbr = "GWF"
# build package name and package
gnc_name = f"{package_abbr}-GNC_{self._gnc_file_num}"
ghost_node_file = mfgwfgnc.ModflowGwfgnc(
self,
filename=fname,
pname=gnc_name,
parent_file=parent_package,
loading_package=True,
)
ghost_node_file.load(strict)
self._ghost_node_files[fname] = ghost_node_file
self._gnc_file_num += 1
return ghost_node_file
elif ftype == "mvr":
if fname not in self._mover_files:
# Get package type from parent package
if parent_package:
package_abbr = parent_package.package_abbr[0:3]
else:
package_abbr = "GWF"
# build package name and package
mvr_name = f"{package_abbr}-MVR_{self._mvr_file_num}"
mover_file = mfgwfmvr.ModflowGwfmvr(
self,
filename=fname,
pname=mvr_name,
parent_file=parent_package,
loading_package=True,
)
mover_file.load(strict)
self._mover_files[fname] = mover_file
self._mvr_file_num += 1
return mover_file
elif ftype == "mvt":
if fname not in self._mvt_files:
# Get package type from parent package
if parent_package:
package_abbr = parent_package.package_abbr[0:3]
else:
package_abbr = "GWT"
# build package name and package
mvt_name = f"{package_abbr}-MVT_{self._mvt_file_num}"
mvt_file = mfgwtmvt.ModflowGwtmvt(
self,
filename=fname,
pname=mvt_name,
parent_file=parent_package,
loading_package=True,
)
mvt_file.load(strict)
self._mvt_files[fname] = mvt_file
self._mvt_file_num += 1
return mvt_file
else:
# get package object from file type
package_obj = self.package_factory(ftype, "")
# create package
package = package_obj(
self,
filename=fname,
pname=dict_package_name,
parent_file=parent_package,
loading_package=True,
)
# verify that this is a utility package
utl_struct = mfstructure.MFStructure().sim_struct.utl_struct_objs
if package.package_type in utl_struct:
package.load(strict)
self._other_files[package.filename] = package
# register child package with the simulation
self._add_package(package, package.path)
if parent_package is not None:
# register child package with the parent package
parent_package._add_package(package, package.path)
else:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"WARNING: Unsupported file type {} for "
"simulation.".format(package.package_type)
)
return package
[docs] def register_ims_package(self, ims_file, model_list):
"""
Register an ims package with the simulation.
Parameters
ims_file : MFPackage
ims package to register
model_list : list of strings
list of models using the ims package to be registered
"""
if isinstance(model_list, str):
model_list = [model_list]
if not isinstance(ims_file, mfims.ModflowIms):
comment = (
'Parameter "ims_file" is not a valid ims file. '
'Expected type ModflowIms, but type "{}" was given'
".".format(type(ims_file))
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
None,
"ims",
"",
"registering ims package",
"",
inspect.stack()[0][3],
type_,
value_,
traceback_,
comment,
self.simulation_data.debug,
)
# remove models from existing solution groups
if model_list is not None:
for model in model_list:
self._remove_from_all_ims_solution_groups(model)
# register ims package with model list
in_simulation = False
pkg_with_same_name = None
for file in self._ims_files.values():
if file is ims_file:
in_simulation = True
if file.package_name == ims_file.package_name and file != ims_file:
pkg_with_same_name = file
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"WARNING: ims package with name {} already exists. "
"New ims package will replace old package"
".".format(file.package_name)
)
self._remove_package(self._ims_files[file.filename])
del self._ims_files[file.filename]
break
# register ims package
if not in_simulation:
self._add_package(ims_file, self._get_package_path(ims_file))
# do not allow an ims package to be registered twice with the
# same simulation
if not in_simulation:
# create unique file/package name
if ims_file.package_name is None:
file_num = len(self._ims_files) - 1
ims_file.package_name = f"ims_{file_num}"
if ims_file.filename in self._ims_files:
ims_file.filename = MFFileMgmt.unique_file_name(
ims_file.filename, self._ims_files
)
# add ims package to simulation
self._ims_files[ims_file.filename] = ims_file
# If ims file is being replaced, replace ims filename in
# solution group
if pkg_with_same_name is not None and self._is_in_solution_group(
pkg_with_same_name.filename, 1
):
# change existing solution group to reflect new ims file
self._replace_ims_in_solution_group(
pkg_with_same_name.filename, 1, ims_file.filename
)
# only allow an ims package to be registered to one solution group
elif model_list is not None:
ims_in_group = self._is_in_solution_group(ims_file.filename, 1)
# add solution group to the simulation name file
solution_recarray = self.name_file.solutiongroup
solution_group_list = solution_recarray.get_active_key_list()
if len(solution_group_list) == 0:
solution_group_num = 0
else:
solution_group_num = solution_group_list[-1][0]
if ims_in_group:
self._append_to_ims_solution_group(
ims_file.filename, model_list
)
else:
if self.name_file.mxiter.get_data(solution_group_num) is None:
self.name_file.mxiter.add_transient_key(solution_group_num)
# associate any models in the model list to this
# simulation file
version_string = mfstructure.MFStructure().get_version_string()
ims_pkg = f"ims{version_string}"
new_record = [ims_pkg, ims_file.filename]
for model in model_list:
new_record.append(model)
try:
solution_recarray.append_list_as_record(
new_record, solution_group_num
)
except MFDataException as mfde:
message = (
"Error occurred while updating the "
"simulation name file with the ims package "
'file "{}".'.format(ims_file.filename)
)
raise MFDataException(
mfdata_except=mfde, package="nam", message=message
)
@staticmethod
def _rename_package_group(group_dict, name):
package_type_count = {}
# first build an array to avoid key modification errors
package_array = []
for package in group_dict.values():
package_array.append(package)
# update package file names and count
for package in package_array:
if package.package_type not in package_type_count:
file_name = f"{name}.{package.package_type}"
package_type_count[package.package_type] = 1
else:
package_type_count[package.package_type] += 1
ptc = package_type_count[package.package_type]
file_name = f"{name}_{ptc}.{package.package_type}"
base_filepath = os.path.split(package.filename)[0]
if base_filepath != "":
# include existing relative path in new file name
file_name = os.path.join(base_filepath, file_name)
package.filename = file_name
def _rename_exchange_file(self, package, new_filename):
self._exchange_files[package.filename] = package
try:
exchange_recarray_data = self.name_file.exchanges.get_data()
except MFDataException as mfde:
message = (
"An error occurred while retrieving exchange "
"data from the simulation name file. The error "
"occurred while registering exchange file "
f'"{package.filename}".'
)
raise MFDataException(
mfdata_except=mfde,
package=package._get_pname(),
message=message,
)
if exchange_recarray_data is not None:
for index, exchange in zip(
range(0, len(exchange_recarray_data)),
exchange_recarray_data,
):
if exchange[1] == package.filename:
# update existing exchange
exchange_recarray_data[index][1] = new_filename
ex_recarray = self.name_file.exchanges
try:
ex_recarray.set_data(exchange_recarray_data)
except MFDataException as mfde:
message = (
"An error occurred while setting "
"exchange data in the simulation name "
"file. The error occurred while "
"registering the following "
"values (exgtype, filename, "
f'exgmnamea, exgmnameb): "{package.exgtype} '
f"{package.filename} {package.exgmnamea}"
f'{package.exgmnameb}".'
)
raise MFDataException(
mfdata_except=mfde,
package=package._get_pname(),
message=message,
)
return
def _set_timing_block(self, file_name):
struct_root = mfstructure.MFStructure()
tdis_pkg = "tdis{}".format(struct_root.get_version_string())
tdis_attr = getattr(self.name_file, tdis_pkg)
try:
tdis_attr.set_data(file_name)
except MFDataException as mfde:
message = (
"An error occurred while setting the tdis package "
f'file name "{file_name}". The error occurred while '
"registering the tdis package with the "
"simulation"
)
raise MFDataException(
mfdata_except=mfde,
package=file_name,
message=message,
)
[docs] def update_package_filename(self, package, new_name):
"""
Updates internal arrays to be consistent with a new file name.
This is for internal flopy library use only.
Parameters
----------
package: MFPackage
Package with new name
new_name: str
Package's new name
"""
if (
self._tdis_file is not None
and package.filename == self._tdis_file.filename
):
self._set_timing_block(new_name)
if package.filename in self._exchange_files:
self._exchange_files[new_name] = self._exchange_files.pop(
package.filename
)
self._rename_exchange_file(package, new_name)
if package.filename in self._ims_files:
self._ims_files[new_name] = self._ims_files.pop(package.filename)
self._update_ims_solution_group(package.filename, new_name)
if package.filename in self._ghost_node_files:
self._update_exg_files_gnc(package.filename, new_name)
self._ghost_node_files[new_name] = self._ghost_node_files.pop(
package.filename
)
if package.filename in self._mover_files:
self._update_exg_files_mvr(package.filename, new_name)
self._mover_files[new_name] = self._mover_files.pop(
package.filename
)
if package.filename in self._mvt_files:
self._update_exg_files_mvr(package.filename, new_name)
self._mvt_files[new_name] = self._mvt_files.pop(package.filename)
if package.filename in self._other_files:
self._other_files[new_name] = self._other_files.pop(
package.filename
)
[docs] def rename_all_packages(self, name):
"""
Rename all packages with name as prefix.
Parameters
----------
name: str
Prefix of package names
"""
if self._tdis_file is not None:
self._tdis_file.filename = f"{name}.{self._tdis_file.package_type}"
self._rename_package_group(self._exchange_files, name)
self._rename_package_group(self._ims_files, name)
self._rename_package_group(self._ghost_node_files, name)
self._rename_package_group(self._mover_files, name)
self._rename_package_group(self._mvt_files, name)
self._rename_package_group(self._other_files, name)
for model in self._models.values():
model.rename_all_packages(name)
[docs] def set_all_data_external(
self, check_data=True, external_data_folder=None
):
"""Sets the simulation's list and array data to be stored externally.
Parameters
----------
check_data: bool
Determines if data error checking is enabled during this
process. Data error checking can be slow on large datasets.
external_data_folder
Folder, relative to the simulation path or model relative path
(see use_model_relative_path parameter), where external data
will be stored
"""
# copy any files whose paths have changed
self.simulation_data.mfpath.copy_files()
# set data external for all packages in all models
for model in self._models.values():
model.set_all_data_external(check_data, external_data_folder)
# set data external for ims packages
for package in self._ims_files.values():
package.set_all_data_external(check_data, external_data_folder)
# set data external for ghost node packages
for package in self._ghost_node_files.values():
package.set_all_data_external(check_data, external_data_folder)
# set data external for mover packages
for package in self._mover_files.values():
package.set_all_data_external(check_data, external_data_folder)
for package in self._mvt_files.values():
package.set_all_data_external(check_data, external_data_folder)
for package in self._exchange_files.values():
package.set_all_data_external(check_data, external_data_folder)
[docs] def set_all_data_internal(self, check_data=True):
# set data external for all packages in all models
for model in self._models.values():
model.set_all_data_internal(check_data)
# set data external for ims packages
for package in self._ims_files.values():
package.set_all_data_internal(check_data)
# set data external for ghost node packages
for package in self._ghost_node_files.values():
package.set_all_data_internal(check_data)
# set data external for mover packages
for package in self._mover_files.values():
package.set_all_data_internal(check_data)
for package in self._mvt_files.values():
package.set_all_data_internal(check_data)
for package in self._exchange_files.values():
package.set_all_data_internal(check_data)
[docs] def write_simulation(
self, ext_file_action=ExtFileAction.copy_relative_paths, silent=False
):
"""
Write the simulation to files.
Parameters
ext_file_action : ExtFileAction
Defines what to do with external files when the simulation
path has changed. Defaults to copy_relative_paths which
copies only files with relative paths, leaving files defined
by absolute paths fixed.
silent : bool
Writes out the simulation in silent mode (verbosity_level = 0)
"""
sim_data = self.simulation_data
if not sim_data.max_columns_user_set:
# search for dis packages
for model in self._models.values():
dis = model.get_package("dis")
if dis is not None and hasattr(dis, "ncol"):
sim_data.max_columns_of_data = dis.ncol.get_data()
sim_data.max_columns_user_set = False
sim_data.max_columns_auto_set = True
saved_verb_lvl = self.simulation_data.verbosity_level
if silent:
self.simulation_data.verbosity_level = VerbosityLevel.quiet
# write simulation name file
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print("writing simulation...")
print(" writing simulation name file...")
self.name_file.write(ext_file_action=ext_file_action)
# write TDIS file
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(" writing simulation tdis package...")
self._tdis_file.write(ext_file_action=ext_file_action)
# write ims files
for ims_file in self._ims_files.values():
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(f" writing ims package {ims_file._get_pname()}...")
ims_file.write(ext_file_action=ext_file_action)
# write exchange files
for exchange_file in self._exchange_files.values():
exchange_file.write()
if (
hasattr(exchange_file, "gnc_filerecord")
and exchange_file.gnc_filerecord.has_data()
):
try:
gnc_file = exchange_file.gnc_filerecord.get_data()[0][0]
except MFDataException as mfde:
message = (
"An error occurred while retrieving the ghost "
"node file record from exchange file "
'"{}".'.format(exchange_file.filename)
)
raise MFDataException(
mfdata_except=mfde,
package=exchange_file._get_pname(),
message=message,
)
if gnc_file in self._ghost_node_files:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
" writing gnc package {}...".format(
self._ghost_node_files[gnc_file]._get_pname()
)
)
self._ghost_node_files[gnc_file].write(
ext_file_action=ext_file_action
)
else:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"WARNING: Ghost node file {} not loaded prior to"
" writing. File will not be written"
".".format(gnc_file)
)
if (
hasattr(exchange_file, "mvr_filerecord")
and exchange_file.mvr_filerecord.has_data()
):
try:
mvr_file = exchange_file.mvr_filerecord.get_data()[0][0]
except MFDataException as mfde:
message = (
"An error occurred while retrieving the mover "
"file record from exchange file "
'"{}".'.format(exchange_file.filename)
)
raise MFDataException(
mfdata_except=mfde,
package=exchange_file._get_pname(),
message=message,
)
if mvr_file in self._mover_files:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
" writing mvr package {}...".format(
self._mover_files[mvr_file]._get_pname()
)
)
self._mover_files[mvr_file].write(
ext_file_action=ext_file_action
)
else:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"WARNING: Mover file {} not loaded prior to "
"writing. File will not be "
"written.".format(mvr_file)
)
if (
hasattr(exchange_file, "mvt_filerecord")
and exchange_file.mvt_filerecord.has_data()
):
try:
mvt_file = exchange_file.mvt_filerecord.get_data()[0][0]
except MFDataException as mfde:
message = (
"An error occurred while retrieving the mover transport "
"file record from exchange file "
'"{}".'.format(exchange_file.filename)
)
raise MFDataException(
mfdata_except=mfde,
package=exchange_file._get_pname(),
message=message,
)
if mvt_file in self._mvt_files:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
" writing mvr package {}...".format(
self._mvt_files[mvt_file]._get_pname()
)
)
self._mvt_files[mvt_file].write(
ext_file_action=ext_file_action
)
else:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"WARNING: Mover transport file {} not loaded prior to "
"writing. File will not be "
"written.".format(mvt_file)
)
# write other packages
for pp in self._other_files.values():
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(f" writing package {pp._get_pname()}...")
pp.write(ext_file_action=ext_file_action)
# FIX: model working folder should be model name file folder
# write models
for model in self._models.values():
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(f" writing model {model.name}...")
model.write(ext_file_action=ext_file_action)
self.simulation_data.mfpath.set_last_accessed_path()
if silent:
self.simulation_data.verbosity_level = saved_verb_lvl
[docs] def set_sim_path(self, path):
"""Return a list of output data keys.
Parameters
----------
path : str
Relative or absolute path to simulation root folder.
"""
# set all data internal
self.set_all_data_internal()
# set simulation path
self.simulation_data.mfpath.set_sim_path(path, True)
if not os.path.exists(path):
# create new simulation folder
os.makedirs(path)
[docs] def run_simulation(
self,
silent=None,
pause=False,
report=False,
normal_msg="normal termination",
use_async=False,
cargs=None,
):
"""
Run the simulation.
Parameters
----------
silent: bool
Run in silent mode
pause: bool
Pause at end of run
report: bool
Save stdout lines to a list (buff)
normal_msg: str or list
Normal termination message used to determine if the run
terminated normally. More than one message can be provided
using a list. (default is 'normal termination')
use_async : bool
Asynchronously read model stdout and report with timestamps.
good for models that take long time to run. not good for
models that run really fast
cargs : str or list of strings
Additional command line arguments to pass to the executable.
default is None
Returns
--------
success : bool
buff : list of lines of stdout
"""
if silent is None:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
silent = False
else:
silent = True
return run_model(
self.exe_name,
None,
self.simulation_data.mfpath.get_sim_path(),
silent=silent,
pause=pause,
report=report,
normal_msg=normal_msg,
use_async=use_async,
cargs=cargs,
)
[docs] def delete_output_files(self):
"""Deletes simulation output files."""
output_req = binaryfile_utils.MFOutputRequester
output_file_keys = output_req.getkeys(
self.simulation_data.mfdata, self.simulation_data.mfpath, False
)
for path in output_file_keys.binarypathdict.values():
if os.path.isfile(path):
os.remove(path)
[docs] def remove_package(self, package_name):
"""
Removes package from the simulation. `package_name` can be the
package's name, type, or package object to be removed from the model.
Parameters
----------
package_name : str
Name of package to be removed
"""
if isinstance(package_name, MFPackage):
packages = [package_name]
else:
packages = self.get_package(package_name)
if not isinstance(packages, list):
packages = [packages]
for package in packages:
if (
self._tdis_file is not None
and package.path == self._tdis_file.path
):
self._tdis_file = None
if package.filename in self._exchange_files:
del self._exchange_files[package.filename]
if package.filename in self._ims_files:
del self._ims_files[package.filename]
self._update_ims_solution_group(package.filename)
if package.filename in self._ghost_node_files:
del self._ghost_node_files[package.filename]
if package.filename in self._mover_files:
del self._mover_files[package.filename]
if package.filename in self._mvt_files:
del self._mvt_files[package.filename]
if package.filename in self._other_files:
del self._other_files[package.filename]
self._remove_package(package)
@property
def model_dict(self):
"""
Return a dictionary of models associated with this simulation.
Returns
--------
model dict : dict
dictionary of models
"""
return self._models.copy()
[docs] def get_model(self, model_name=None):
"""
Returns the models in the simulation with a given model name, name
file name, or model type.
Parameters
----------
model_name : str
Name of the model to get. Passing in None or an empty list
will get the first model.
Returns
--------
model : MFModel
"""
if len(self._models) == 0:
return None
if model_name is None:
for model in self._models.values():
return model
if model_name in self._models:
return self._models[model_name]
# do case-insensitive lookup
for name, model in self._models.items():
if model_name.lower() == name.lower():
return model
return None
[docs] def get_exchange_file(self, filename):
"""
Get a specified exchange file.
Parameters
----------
filename : str
Name of exchange file to get
Returns
--------
exchange package : MFPackage
"""
if filename in self._exchange_files:
return self._exchange_files[filename]
else:
excpt_str = f'Exchange file "{filename}" can not be found.'
raise FlopyException(excpt_str)
[docs] def get_mvr_file(self, filename):
"""
Get a specified mover file.
Parameters
----------
filename : str
Name of mover file to get
Returns
--------
mover package : MFPackage
"""
if filename in self._mover_files:
return self._mover_files[filename]
else:
excpt_str = f'MVR file "{filename}" can not be found.'
raise FlopyException(excpt_str)
[docs] def get_mvt_file(self, filename):
"""
Get a specified mvt file.
Parameters
----------
filename : str
Name of mover transport file to get
Returns
--------
mover transport package : MFPackage
"""
if filename in self._mvt_files:
return self._mvt_files[filename]
else:
excpt_str = f'MVT file "{filename}" can not be found.'
raise FlopyException(excpt_str)
[docs] def get_gnc_file(self, filename):
"""
Get a specified gnc file.
Parameters
----------
filename : str
Name of gnc file to get
Returns
--------
gnc package : MFPackage
"""
if filename in self._ghost_node_files:
return self._ghost_node_files[filename]
else:
excpt_str = f'GNC file "{filename}" can not be found.'
raise FlopyException(excpt_str)
[docs] def remove_exchange_file(self, package):
"""
Removes the exchange file "package". This is for internal flopy
library use only.
Parameters
----------
package: MFPackage
Exchange package to be removed
"""
self._exchange_files[package.filename] = package
try:
exchange_recarray_data = self.name_file.exchanges.get_data()
except MFDataException as mfde:
message = (
"An error occurred while retrieving exchange "
"data from the simulation name file. The error "
"occurred while registering exchange file "
f'"{package.filename}".'
)
raise MFDataException(
mfdata_except=mfde,
package=package._get_pname(),
message=message,
)
remove_indices = []
if exchange_recarray_data is not None:
for index, exchange in zip(
range(0, len(exchange_recarray_data)),
exchange_recarray_data,
):
if (
package.filename is not None
and exchange[1] == package.filename
):
remove_indices.append(index)
if len(remove_indices) > 0:
self.name_file.exchanges.set_data(
np.delete(exchange_recarray_data, remove_indices)
)
[docs] def register_exchange_file(self, package):
"""
Register an exchange package file with the simulation. This is a
call-back method made from the package and should not be called
directly.
Parameters
----------
package : MFPackage
Exchange package object to register
"""
if package.filename not in self._exchange_files:
exgtype = package.exgtype
exgmnamea = package.exgmnamea
exgmnameb = package.exgmnameb
if exgtype is None or exgmnamea is None or exgmnameb is None:
excpt_str = (
"Exchange packages require that exgtype, "
"exgmnamea, and exgmnameb are specified."
)
raise FlopyException(excpt_str)
self._exchange_files[package.filename] = package
try:
exchange_recarray_data = self.name_file.exchanges.get_data()
except MFDataException as mfde:
message = (
"An error occurred while retrieving exchange "
"data from the simulation name file. The error "
"occurred while registering exchange file "
f'"{package.filename}".'
)
raise MFDataException(
mfdata_except=mfde,
package=package._get_pname(),
message=message,
)
if exchange_recarray_data is not None:
for index, exchange in zip(
range(0, len(exchange_recarray_data)),
exchange_recarray_data,
):
if exchange[1] == package.filename:
# update existing exchange
exchange_recarray_data[index][0] = exgtype
exchange_recarray_data[index][2] = exgmnamea
exchange_recarray_data[index][3] = exgmnameb
ex_recarray = self.name_file.exchanges
try:
ex_recarray.set_data(exchange_recarray_data)
except MFDataException as mfde:
message = (
"An error occurred while setting "
"exchange data in the simulation name "
"file. The error occurred while "
"registering the following "
"values (exgtype, filename, "
f'exgmnamea, exgmnameb): "{exgtype} '
f"{package.filename} {exgmnamea}"
f'{exgmnameb}".'
)
raise MFDataException(
mfdata_except=mfde,
package=package._get_pname(),
message=message,
)
return
try:
# add new exchange
self.name_file.exchanges.append_data(
[(exgtype, package.filename, exgmnamea, exgmnameb)]
)
except MFDataException as mfde:
message = (
"An error occurred while setting exchange data "
"in the simulation name file. The error occurred "
"while registering the following values (exgtype, "
f'filename, exgmnamea, exgmnameb): "{exgtype} '
f'{package.filename} {exgmnamea} {exgmnameb}".'
)
raise MFDataException(
mfdata_except=mfde,
package=package._get_pname(),
message=message,
)
if (
package.dimensions is None
or package.dimensions.model_dim is None
):
# resolve exchange package dimensions object
package.dimensions = package.create_package_dimensions()
def _remove_package_by_type(self, package):
pname = None
if package.package_name is not None:
pname = package.package_name.lower()
if (
package.package_type.lower() == "tdis"
and self._tdis_file is not None
and self._tdis_file in self._packagelist
):
# tdis package already exists. there can be only one tdis
# package. remove existing tdis package
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"WARNING: tdis package already exists. Replacing "
"existing tdis package."
)
self._remove_package(self._tdis_file)
elif (
package.package_type.lower() == "gnc"
and package.filename in self._ghost_node_files
and self._ghost_node_files[package.filename] in self._packagelist
):
# gnc package with same file name already exists. remove old
# gnc package
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
f"WARNING: gnc package with name {pname} already exists. "
"Replacing existing gnc package."
)
self._remove_package(self._ghost_node_files[package.filename])
del self._ghost_node_files[package.filename]
elif (
package.package_type.lower() == "mvr"
and package.filename in self._mover_files
and self._mover_files[package.filename] in self._packagelist
):
# mvr package with same file name already exists. remove old
# mvr package
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
f"WARNING: mvr package with name {pname} already exists. "
"Replacing existing mvr package."
)
self._remove_package(self._mover_files[package.filename])
del self._mover_files[package.filename]
elif (
package.package_type.lower() == "mvt"
and package.filename in self._mvt_files
and self._mvt_files[package.filename] in self._packagelist
):
# mvr package with same file name already exists. remove old
# mvr package
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
f"WARNING: mvt package with name {pname} already exists. "
"Replacing existing mvr package."
)
self._remove_package(self._mvt_files[package.filename])
del self._mvt_files[package.filename]
elif (
package.package_type.lower() != "ims"
and pname in self.package_name_dict
):
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"WARNING: Package with name "
f"{package.package_name.lower()} already exists. "
"Replacing existing package."
)
self._remove_package(self.package_name_dict[pname])
[docs] def register_package(
self,
package,
add_to_package_list=True,
set_package_name=True,
set_package_filename=True,
):
"""
Register a package file with the simulation. This is a
call-back method made from the package and should not be called
directly.
Parameters
----------
package : MFPackage
Package to register
add_to_package_list : bool
Add package to lookup list
set_package_name : bool
Produce a package name for this package
set_package_filename : bool
Produce a filename for this package
Returns
--------
(path : tuple, package structure : MFPackageStructure)
"""
package.container_type = [PackageContainerType.simulation]
path = self._get_package_path(package)
if add_to_package_list and package.package_type.lower != "nam":
self._remove_package_by_type(package)
if package.package_type.lower() != "ims":
# all but ims packages get added here. ims packages are
# added during ims package registration
self._add_package(package, path)
if package.package_type.lower() == "nam":
return path, self.structure.name_file_struct_obj
elif package.package_type.lower() == "tdis":
self._tdis_file = package
self._set_timing_block(package.quoted_filename)
return (
path,
self.structure.package_struct_objs[
package.package_type.lower()
],
)
elif package.package_type.lower() == "gnc":
if package.filename not in self._ghost_node_files:
self._ghost_node_files[package.filename] = package
self._gnc_file_num += 1
elif self._ghost_node_files[package.filename] != package:
# auto generate a unique file name and register it
file_name = MFFileMgmt.unique_file_name(
package.filename, self._ghost_node_files
)
package.filename = file_name
self._ghost_node_files[file_name] = package
elif package.package_type.lower() == "mvr":
if package.filename not in self._mover_files:
self._mover_files[package.filename] = package
else:
# auto generate a unique file name and register it
file_name = MFFileMgmt.unique_file_name(
package.filename, self._mover_files
)
package.filename = file_name
self._mover_files[file_name] = package
elif package.package_type.lower() == "mvt":
if package.filename not in self._mvt_files:
self._mvt_files[package.filename] = package
else:
# auto generate a unique file name and register it
file_name = MFFileMgmt.unique_file_name(
package.filename, self._mvt_files
)
package.filename = file_name
self._mvt_files[file_name] = package
elif package.package_type.lower() == "ims":
# default behavior is to register the ims package with the first
# unregistered model
unregistered_models = []
for model in self._models:
model_registered = self._is_in_solution_group(model, 2, True)
if not model_registered:
unregistered_models.append(model)
if unregistered_models:
self.register_ims_package(package, unregistered_models)
else:
self.register_ims_package(package, None)
return (
path,
self.structure.package_struct_objs[
package.package_type.lower()
],
)
else:
self._other_files[package.filename] = package
if package.package_type.lower() in self.structure.package_struct_objs:
return (
path,
self.structure.package_struct_objs[
package.package_type.lower()
],
)
elif package.package_type.lower() in self.structure.utl_struct_objs:
return (
path,
self.structure.utl_struct_objs[package.package_type.lower()],
)
else:
excpt_str = (
'Invalid package type "{}". Unable to register '
"package.".format(package.package_type)
)
print(excpt_str)
raise FlopyException(excpt_str)
[docs] def rename_model_namefile(self, model, new_namefile):
"""
Rename a model's namefile. For internal flopy library use only.
Parameters
----------
model : MFModel
Model object whose namefile to rename
new_namefile : str
Name of the new namefile
"""
# update simulation name file
models = self.name_file.models.get_data()
for mdl in models:
path, name_file_name = os.path.split(mdl[1])
if name_file_name == model.name_file.filename:
mdl[1] = os.path.join(path, new_namefile)
self.name_file.models.set_data(models)
[docs] def register_model(self, model, model_type, model_name, model_namefile):
"""
Add a model to the simulation. This is a call-back method made
from the package and should not be called directly.
Parameters
----------
model : MFModel
Model object to add to simulation
sln_group : str
Solution group of model
Returns
--------
model_structure_object : MFModelStructure
"""
# get model structure from model type
if model_type not in self.structure.model_struct_objs:
message = f'Invalid model type: "{model_type}".'
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
model.name,
"",
model.name,
"registering model",
"sim",
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self.simulation_data.debug,
)
# add model
self._models[model_name] = model
# update simulation name file
self.name_file.models.append_list_as_record(
[model_type, model_namefile, model_name]
)
if len(self._ims_files) > 0:
# register model with first ims file found
first_ims_key = next(iter(self._ims_files))
self.register_ims_package(
self._ims_files[first_ims_key], model_name
)
return self.structure.model_struct_objs[model_type]
[docs] def get_ims_package(self, key):
"""
Get the ims package with the specified `key`.
Parameters
----------
key : str
ims package file name
Returns
--------
ims_package : ModflowIms
"""
if key in self._ims_files:
return self._ims_files[key]
return None
[docs] def remove_model(self, model_name):
"""
Remove model with name `model_name` from the simulation
Parameters
----------
model_name : str
Model name to remove from simulation
"""
# Remove model
del self._models[model_name]
# TODO: Fully implement this
# Update simulation name file
[docs] def is_valid(self):
"""
Checks the validity of the solution and all of its models and
packages. Returns true if the solution is valid, false if it is not.
Returns
--------
valid : bool
Whether this is a valid simulation
"""
# name file valid
if not self.name_file.is_valid():
return False
# tdis file valid
if not self._tdis_file.is_valid():
return False
# exchanges valid
for exchange in self._exchange_files:
if not exchange.is_valid():
return False
# ims files valid
for imsfile in self._ims_files.values():
if not imsfile.is_valid():
return False
# a model exists
if not self._models:
return False
# models valid
for key in self._models:
if not self._models[key].is_valid():
return False
# each model has an imsfile
return True
@staticmethod
def _resolve_verbosity_level(verbosity_level):
if verbosity_level == 0:
return VerbosityLevel.quiet
elif verbosity_level == 1:
return VerbosityLevel.normal
elif verbosity_level == 2:
return VerbosityLevel.verbose
else:
return verbosity_level
@staticmethod
def _get_package_path(package):
if package.parent_file is not None:
return (package.parent_file.path) + (package.package_type,)
else:
return (package.package_type,)
def _update_ims_solution_group(self, ims_file, new_name=None):
solution_recarray = self.name_file.solutiongroup
for solution_group_num in solution_recarray.get_active_key_list():
try:
rec_array = solution_recarray.get_data(solution_group_num[0])
except MFDataException as mfde:
message = (
"An error occurred while getting solution group"
'"{}" from the simulation name file'
".".format(solution_group_num[0])
)
raise MFDataException(
mfdata_except=mfde, package="nam", message=message
)
new_array = []
for record in rec_array:
if record.slnfname == ims_file:
if new_name is not None:
record.slnfname = new_name
new_array.append(tuple(record))
else:
continue
else:
new_array.append(record)
if not new_array:
new_array = None
solution_recarray.set_data(new_array, solution_group_num[0])
def _remove_from_all_ims_solution_groups(self, modelname):
solution_recarray = self.name_file.solutiongroup
for solution_group_num in solution_recarray.get_active_key_list():
try:
rec_array = solution_recarray.get_data(solution_group_num[0])
except MFDataException as mfde:
message = (
"An error occurred while getting solution group"
'"{}" from the simulation name file'
".".format(solution_group_num[0])
)
raise MFDataException(
mfdata_except=mfde, package="nam", message=message
)
new_array = ["no_check"]
for index, record in enumerate(rec_array):
new_record = []
new_record.append(record[0])
new_record.append(record[1])
for item in list(record)[2:]:
if item is not None and item.lower() != modelname.lower():
new_record.append(item)
new_array.append(tuple(new_record))
solution_recarray.set_data(new_array, solution_group_num[0])
def _append_to_ims_solution_group(self, ims_file, new_models):
# clear models out of solution groups
if new_models is not None:
for model in new_models:
self._remove_from_all_ims_solution_groups(model)
# append models to ims_file
solution_recarray = self.name_file.solutiongroup
for solution_group_num in solution_recarray.get_active_key_list():
try:
rec_array = solution_recarray.get_data(solution_group_num[0])
except MFDataException as mfde:
message = (
"An error occurred while getting solution group"
'"{}" from the simulation name file'
".".format(solution_group_num[0])
)
raise MFDataException(
mfdata_except=mfde, package="nam", message=message
)
new_array = []
for index, record in enumerate(rec_array):
new_record = []
rec_model_dict = {}
for index, item in enumerate(record):
if (
record[1] == ims_file or item not in new_models
) and item is not None:
new_record.append(item)
if index > 1 and item is not None:
rec_model_dict[item.lower()] = 1
if record[1] == ims_file:
for model in new_models:
if model.lower() not in rec_model_dict:
new_record.append(model)
new_array.append(tuple(new_record))
solution_recarray.set_data(new_array, solution_group_num[0])
def _replace_ims_in_solution_group(self, item, index, new_item):
solution_recarray = self.name_file.solutiongroup
for solution_group_num in solution_recarray.get_active_key_list():
try:
rec_array = solution_recarray.get_data(solution_group_num[0])
except MFDataException as mfde:
message = (
"An error occurred while getting solution group"
'"{}" from the simulation name file. The error '
'occurred while replacing IMS file "{}" with "{}"'
'at index "{}"'.format(
solution_group_num[0], item, new_item, index
)
)
raise MFDataException(
mfdata_except=mfde, package="nam", message=message
)
if rec_array is not None:
for rec_item in rec_array:
if rec_item[index] == item:
rec_item[index] = new_item
def _is_in_solution_group(self, item, index, any_idx_after=False):
solution_recarray = self.name_file.solutiongroup
for solution_group_num in solution_recarray.get_active_key_list():
try:
rec_array = solution_recarray.get_data(solution_group_num[0])
except MFDataException as mfde:
message = (
"An error occurred while getting solution group"
'"{}" from the simulation name file. The error '
'occurred while verifying file "{}" at index "{}" '
"is in the simulation name file"
".".format(solution_group_num[0], item, index)
)
raise MFDataException(
mfdata_except=mfde, package="nam", message=message
)
if rec_array is not None:
for rec_item in rec_array:
if any_idx_after:
for idx in range(index, len(rec_item)):
if rec_item[idx] == item:
return True
else:
if rec_item[index] == item:
return True
return False
def _update_exg_files_gnc(self, gnc_filename, new_filename):
for exchange_file in self._exchange_files.values():
if (
hasattr(exchange_file, "gnc_filerecord")
and exchange_file.gnc_filerecord.has_data()
):
try:
gnc_file = exchange_file.gnc_filerecord.get_data()
except MFDataException as mfde:
message = (
"An error occurred while retrieving the ghost "
"node file record from exchange file "
'"{}".'.format(exchange_file.filename)
)
raise MFDataException(
mfdata_except=mfde,
package=exchange_file._get_pname(),
message=message,
)
if gnc_file[0][0] == gnc_filename:
gnc_file[0][0] = new_filename
exchange_file.gnc_filerecord.set_data(gnc_file)
def _update_exg_file_mvr(self, mvr_filename, new_filename):
for exchange_file in self._exchange_files.values():
if (
hasattr(exchange_file, "mvr_filerecord")
and exchange_file.mvr_filerecord.has_data()
):
try:
mvr_file = exchange_file.mvr_filerecord.get_data()[0][0]
except MFDataException as mfde:
message = (
"An error occurred while retrieving the mover "
"file record from exchange file "
'"{}".'.format(exchange_file.filename)
)
raise MFDataException(
mfdata_except=mfde,
package=exchange_file._get_pname(),
message=message,
)
if mvr_file[0][0] == mvr_filename:
mvr_file[0][0] = new_filename
exchange_file.mvr_filerecord.set_data(mvr_file)
def _update_exg_file_mvt(self, mvt_filename, new_filename):
for exchange_file in self._exchange_files.values():
if (
hasattr(exchange_file, "mvt_filerecord")
and exchange_file.mvr_filerecord.has_data()
):
try:
mvt_file = exchange_file.mvt_filerecord.get_data()[0][0]
except MFDataException as mfde:
message = (
"An error occurred while retrieving the transport mover "
"file record from exchange file "
'"{}".'.format(exchange_file.filename)
)
raise MFDataException(
mfdata_except=mfde,
package=exchange_file._get_pname(),
message=message,
)
if mvt_file[0][0] == mvt_filename:
mvt_file[0][0] = new_filename
exchange_file.mvt_filerecord.set_data(mvt_file)
[docs] def plot(self, model_list=None, SelPackList=None, **kwargs):
"""
Plot simulation or models.
Method to plot a whole simulation or a series of models
that are part of a simulation.
Parameters
----------
model_list: (list)
List of model names to plot, if none all models will be plotted
SelPackList: (list)
List of package names to plot, if none all packages will be
plotted
kwargs:
filename_base : str
Base file name that will be used to automatically
generate file names for output image files. Plots will be
exported as image files if file_name_base is not None.
(default is None)
file_extension : str
Valid matplotlib.pyplot file extension for savefig().
Only used if filename_base is not None. (default is 'png')
mflay : int
MODFLOW zero-based layer number to return. If None, then
all layers will be included. (default is None)
kper : int
MODFLOW zero-based stress period number to return.
(default is zero)
key : str
MFList dictionary key. (default is None)
Returns
--------
axes: (list)
matplotlib.pyplot.axes objects
"""
from ...plot.plotutil import PlotUtilities
axes = PlotUtilities._plot_simulation_helper(
self, model_list=model_list, SelPackList=SelPackList, **kwargs
)
return axes