Source code for flopy.mf6.mfsimbase

import errno
import inspect
import os.path
import sys
import warnings
from pathlib import Path
from typing import List, Optional, Union

import numpy as np

from flopy.mbase import run_model
from flopy.mf6.data import mfdata, mfdatalist, mfstructure
from flopy.mf6.data.mfdatautil import MFComment
from flopy.mf6.data.mfstructure import DatumType
from flopy.mf6.mfbase import (
    ExtFileAction,
    FlopyException,
    MFDataException,
    MFFileMgmt,
    PackageContainer,
    PackageContainerType,
    VerbosityLevel,
)
from flopy.mf6.mfpackage import MFPackage
from flopy.mf6.modflow import mfnam, mftdis
from flopy.mf6.utils import binaryfile_utils, mfobservation


[docs]class SimulationDict(dict): """ Class containing custom dictionary for MODFLOW simulations. Dictionary contains model data. Dictionary keys are "paths" to the data that include the model and package containing the data. Behaves as an dict with some additional features described below. Parameters ---------- path : MFFileMgmt Object containing path information for the simulation """ def __init__(self, path=None): dict.__init__(self) self._path = path def __getitem__(self, key): """ Define the __getitem__ magic method. Parameters ---------- key (string): Part or all of a dictionary key Returns: MFData or numpy.ndarray """ if key == "_path" or not hasattr(self, "_path"): raise AttributeError(key) # FIX: Transport - Include transport output files if key[1] in ("CBC", "HDS", "DDN", "UCN"): val = binaryfile_utils.MFOutput(self, self._path, key) return val.data elif key[-1] == "Observations": val = mfobservation.MFObservation(self, self._path, key) return val.data if key in self: val = dict.__getitem__(self, key) return val return AttributeError(key) def __setitem__(self, key, val): """ Define the __setitem__ magic method. Parameters ---------- key : str Dictionary key val : MFData MFData to store in dictionary """ dict.__setitem__(self, key, val)
[docs] def find_in_path(self, key_path, key_leaf): """ Attempt to find key_leaf in a partial key path key_path. Parameters ---------- key_path : str partial path to the data key_leaf : str name of the data Returns ------- Data: MFData, index: int """ key_path_size = len(key_path) for key, item in self.items(): if key[:key_path_size] == key_path: if key[-1] == key_leaf: # found key_leaf as a key in the dictionary return item, None if not isinstance(item, MFComment): data_item_index = 0 data_item_structures = item.structure.data_item_structures for data_item_struct in data_item_structures: if data_item_struct.name == key_leaf: # found key_leaf as a data item name in the data # in the dictionary return item, data_item_index if data_item_struct.type != DatumType.keyword: data_item_index += 1 return None, None
[docs] def output_keys(self, print_keys=True): """ Return a list of output data keys supported by the dictionary. Parameters ---------- print_keys : bool print keys to console Returns ------- output keys : list """ # get keys to request binary output x = binaryfile_utils.MFOutputRequester.getkeys( self, self._path, print_keys=print_keys ) return [key for key in x.dataDict]
[docs] def input_keys(self): """ Return a list of input data keys. Returns ------- input keys : list """ # get keys to request input ie. package data for key in self: print(key)
[docs] def observation_keys(self): """ Return a list of observation keys. Returns ------- observation keys : list """ # get keys to request observation file output mfobservation.MFObservationRequester.getkeys(self, self._path)
[docs] def keys(self): """ Return a list of all keys. Returns ------- all keys : list """ # overrides the built in keys to print all keys, input and output self.input_keys() try: self.output_keys() except OSError as e: if e.errno == errno.EEXIST: pass try: self.observation_keys() except KeyError: pass
[docs]class MFSimulationData: """ Class containing MODFLOW simulation data and file formatting data. Use MFSimulationData to set simulation-wide settings which include data formatting and file location settings. Parameters ---------- path : str path on disk to the simulation Attributes ---------- indent_string : str String used to define how much indent to use (file formatting) internal_formatting : list List defining string to use for internal formatting external_formatting : list List defining string to use for external formatting open_close_formatting : list List defining string to use for open/close max_columns_of_data : int Maximum columns of data before line wraps. For structured grids this is set to ncol by default. For all other grids the default is 20. wrap_multidim_arrays : bool Whether to wrap line for multi-dimensional arrays at the end of a row/column/layer _float_precision : int Number of decimal points to write for a floating point number _float_characters : int Number of characters a floating point number takes up write_headers: bool When true flopy writes a header to each package file indicating that it was created by flopy sci_note_upper_thres : float Numbers greater than this threshold are written in scientific notation sci_note_lower_thres : float Numbers less than this threshold are written in scientific notation mfpath : MFFileMgmt File path location information for the simulation model_dimensions : dict Dictionary containing discretization information for each model mfdata : SimulationDict Custom dictionary containing all model data for the simulation """ def __init__(self, path: Union[str, os.PathLike], mfsim): # --- formatting variables --- self.indent_string = " " self.constant_formatting = ["constant", ""] self._max_columns_of_data = 20 self.wrap_multidim_arrays = True self._float_precision = 8 self._float_characters = 15 self.write_headers = True self._sci_note_upper_thres = 100000 self._sci_note_lower_thres = 0.001 self.fast_write = True self.comments_on = False self.auto_set_sizes = True self.verify_data = True self.debug = False self.verbose = True self.verbosity_level = VerbosityLevel.normal self.max_columns_user_set = False self.max_columns_auto_set = False self.use_pandas = True self._update_str_format() # --- file path --- self.mfpath = MFFileMgmt(path, mfsim) # --- ease of use variables to make working with modflow input and # output data easier --- model dimension class for each model self.model_dimensions = {} # --- model data --- self.mfdata = SimulationDict(self.mfpath) # --- temporary variables --- # other external files referenced self.referenced_files = {} @property def lazy_io(self): if not self.auto_set_sizes and not self.verify_data: return True return False @lazy_io.setter def lazy_io(self, val): if val: self.auto_set_sizes = False self.verify_data = False else: self.auto_set_sizes = True self.verify_data = True @property def max_columns_of_data(self): return self._max_columns_of_data @max_columns_of_data.setter def max_columns_of_data(self, val): if not self.max_columns_user_set and ( not self.max_columns_auto_set or val > self._max_columns_of_data ): self._max_columns_of_data = val self.max_columns_user_set = True @property def float_precision(self): """ Gets precision of floating point numbers. """ return self._float_precision @float_precision.setter def float_precision(self, value): """ Sets precision of floating point numbers. Parameters ---------- value: float floating point precision """ self._float_precision = value self._update_str_format() @property def float_characters(self): """ Gets max characters used in floating point numbers. """ return self._float_characters @float_characters.setter def float_characters(self, value): """ Sets max characters used in floating point numbers. Parameters ---------- value: float floating point max characters """ self._float_characters = value self._update_str_format()
[docs] def set_sci_note_upper_thres(self, value): """ Sets threshold number where any number larger than threshold is represented in scientific notation. Parameters ---------- value: float threshold value """ self._sci_note_upper_thres = value self._update_str_format()
[docs] def set_sci_note_lower_thres(self, value): """ Sets threshold number where any number smaller than threshold is represented in scientific notation. Parameters ---------- value: float threshold value """ self._sci_note_lower_thres = value self._update_str_format()
def _update_str_format(self): """ Update floating point formatting strings.""" self.reg_format_str = f"{{:.{self._float_precision}E}}" self.sci_format_str = ( f"{{:{self._float_characters}.{self._float_precision}f}}" )
[docs]class MFSimulationBase(PackageContainer): """ Entry point into any MODFLOW simulation. MFSimulation is used to load, build, and/or save a MODFLOW 6 simulation. A MFSimulation object must be created before creating any of the MODFLOW 6 model objects. Parameters ---------- sim_name : str Name of the simulation. version : str Version of MODFLOW 6 executable exe_name : str Path to MODFLOW 6 executable sim_ws : str Path to MODFLOW 6 simulation working folder. This is the folder containing the simulation name file. verbosity_level : int Verbosity level of standard output from 0 to 2. When 0 is specified no standard output is written. When 1 is specified standard error/warning messages with some informational messages are written. When 2 is specified full error/warning/informational messages are written (this is ideal for debugging). continue_ : bool Sets the continue option in the simulation name file. The continue option is a keyword flag to indicate that the simulation should continue even if one or more solutions do not converge. nocheck : bool Sets the nocheck option in the simulation name file. The nocheck option is a keyword flag to indicate that the model input check routines should not be called prior to each time step. Checks are performed by default. memory_print_option : str Sets memory_print_option in the simulation name file. Memory_print_option is a flag that controls printing of detailed memory manager usage to the end of the simulation list file. NONE means do not print detailed information. SUMMARY means print only the total memory for each simulation component. ALL means print information for each variable stored in the memory manager. NONE is default if memory_print_option is not specified. write_headers: bool When true flopy writes a header to each package file indicating that it was created by flopy. lazy_io: bool When true flopy only reads external data when the data is requested and only writes external data if the data has changed. This option automatically overrides the verify_data and auto_set_sizes, turning both off. use_pandas: bool Load/save data using pandas dataframes (for supported data) Examples -------- >>> s = MFSimulationBase.load('my simulation', 'simulation.nam') Attributes ---------- sim_name : str Name of the simulation name_file : MFPackage Simulation name file package """ def __init__( self, sim_name="sim", version="mf6", exe_name: Union[str, os.PathLike] = "mf6", sim_ws: Union[str, os.PathLike] = os.curdir, verbosity_level=1, continue_=None, nocheck=None, memory_print_option=None, write_headers=True, lazy_io=False, use_pandas=True, ): super().__init__(MFSimulationData(sim_ws, self), sim_name) self.simulation_data.verbosity_level = self._resolve_verbosity_level( verbosity_level ) self.simulation_data.write_headers = write_headers self.simulation_data.use_pandas = use_pandas if lazy_io: self.simulation_data.lazy_io = True # verify metadata fpdata = mfstructure.MFStructure() if not fpdata.valid: excpt_str = ( "Invalid package metadata. Unable to load MODFLOW " "file structure metadata." ) raise FlopyException(excpt_str) # initialize self.dimensions = None self.type = "Simulation" self.version = version self.exe_name = exe_name self._models = {} self._tdis_file = None self._exchange_files = {} self._solution_files = {} self._other_files = {} self.structure = fpdata.sim_struct self.model_type = None self._exg_file_num = {} self.simulation_data.mfpath.set_last_accessed_path() # build simulation name file self.name_file = mfnam.ModflowNam( self, filename="mfsim.nam", continue_=continue_, nocheck=nocheck, memory_print_option=memory_print_option, _internal_package=True, ) # try to build directory structure sim_path = self.simulation_data.mfpath.get_sim_path() if not os.path.isdir(sim_path): try: os.makedirs(sim_path) except OSError as e: if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.quiet.value ): print( "An error occurred when trying to create the " "directory {}: {}".format(sim_path, e.strerror) ) # set simulation validity initially to false since the user must first # add at least one model to the simulation and fill out the name and # tdis files self.valid = False def __getattr__(self, item): """ Override __getattr__ to allow retrieving models. __getattr__ is used to allow for getting models and packages as if they are attributes Parameters ---------- item : str model or package name Returns ------- md : Model or package object Model or package object of type :class:flopy6.mfmodel or :class:flopy6.mfpackage """ if item == "valid" or not hasattr(self, "valid"): raise AttributeError(item) models = [] if item in self.structure.model_types: # get all models of this type for model in self._models.values(): if model.model_type == item or model.model_type[:-1] == item: models.append(model) if len(models) > 0: return models elif item in self._models: model = self.get_model(item) if model is not None: return model raise AttributeError(item) else: package = self.get_package(item) if package is not None: return package raise AttributeError(item) def __setattr__(self, name, value): if hasattr(self, name) and getattr(self, name) is not None: attribute = object.__getattribute__(self, name) if attribute is not None and isinstance(attribute, mfdata.MFData): try: if isinstance(attribute, mfdatalist.MFList): attribute.set_data(value, autofill=True) else: attribute.set_data(value) except MFDataException as mfde: raise MFDataException( mfdata_except=mfde, model="", package="", ) return super().__setattr__(name, value) def __repr__(self): """ Override __repr__ to print custom string. Returns -------- repr string : str string describing object """ return self._get_data_str(True) def __str__(self): """ Override __str__ to print custom string. Returns -------- str string : str string describing object """ return self._get_data_str(False) def _get_data_str(self, formal): file_mgt = self.simulation_data.mfpath data_str = ( "sim_name = {}\nsim_path = {}\nexe_name = " "{}\n" "\n".format( self.name, file_mgt.get_sim_path(), self.exe_name ) ) for package in self._packagelist: pk_str = package._get_data_str(formal, False) if formal: if len(pk_str.strip()) > 0: data_str = ( "{}###################\nPackage {}\n" "###################\n\n" "{}\n".format(data_str, package._get_pname(), pk_str) ) else: if len(pk_str.strip()) > 0: data_str = ( "{}###################\nPackage {}\n" "###################\n\n" "{}\n".format(data_str, package._get_pname(), pk_str) ) for model in self._models.values(): if formal: mod_repr = repr(model) if len(mod_repr.strip()) > 0: data_str = ( "{}@@@@@@@@@@@@@@@@@@@@\nModel {}\n" "@@@@@@@@@@@@@@@@@@@@\n\n" "{}\n".format(data_str, model.name, mod_repr) ) else: mod_str = str(model) if len(mod_str.strip()) > 0: data_str = ( "{}@@@@@@@@@@@@@@@@@@@@\nModel {}\n" "@@@@@@@@@@@@@@@@@@@@\n\n" "{}\n".format(data_str, model.name, mod_str) ) return data_str @property def model_names(self): """ Return a list of model names associated with this simulation. Returns -------- list: list of model names """ return list(self._models.keys()) @property def exchange_files(self): """ Return list of exchange files associated with this simulation. Returns -------- list: list of exchange names """ return self._exchange_files.values()
[docs] @staticmethod def load( cls_child, sim_name="modflowsim", version="mf6", exe_name: Union[str, os.PathLike] = "mf6", sim_ws: Union[str, os.PathLike] = os.curdir, strict=True, verbosity_level=1, load_only=None, verify_data=False, write_headers=True, lazy_io=False, use_pandas=True, ): """ Load an existing model. Do not call this method directly. Should only be called by child class. Parameters ---------- cls_child : cls object of child class calling load sim_name : str Name of the simulation. version : str MODFLOW version exe_name : str or PathLike Path to MODFLOW executable (relative to the simulation workspace or absolute) sim_ws : str or PathLike Path to simulation workspace strict : bool Strict enforcement of file formatting verbosity_level : int Verbosity level of standard output 0: No standard output 1: Standard error/warning messages with some informational messages 2: Verbose mode with full error/warning/informational messages. This is ideal for debugging. load_only : list List of package abbreviations or package names corresponding to packages that flopy will load. default is None, which loads all packages. the discretization packages will load regardless of this setting. subpackages, like time series and observations, will also load regardless of this setting. example list: ['ic', 'maw', 'npf', 'oc', 'ims', 'gwf6-gwf6'] verify_data : bool Verify data when it is loaded. this can slow down loading write_headers: bool When true flopy writes a header to each package file indicating that it was created by flopy lazy_io: bool When true flopy only reads external data when the data is requested and only writes external data if the data has changed. This option automatically overrides the verify_data and auto_set_sizes, turning both off. use_pandas: bool Load/save data using pandas dataframes (for supported data) Returns ------- sim : MFSimulation object Examples -------- >>> s = flopy.mf6.mfsimulation.load('my simulation') """ # initialize instance = cls_child( sim_name, version, exe_name, sim_ws, verbosity_level, write_headers=write_headers, use_pandas=use_pandas, ) verbosity_level = instance.simulation_data.verbosity_level instance.simulation_data.verify_data = verify_data if lazy_io: instance.simulation_data.lazy_io = True if verbosity_level.value >= VerbosityLevel.normal.value: print("loading simulation...") # build case consistent load_only dictionary for quick lookups load_only = instance._load_only_dict(load_only) # load simulation name file if verbosity_level.value >= VerbosityLevel.normal.value: print(" loading simulation name file...") instance.name_file.load(strict) # load TDIS file tdis_pkg = f"tdis{mfstructure.MFStructure().get_version_string()}" tdis_attr = getattr(instance.name_file, tdis_pkg) instance._tdis_file = mftdis.ModflowTdis( instance, filename=tdis_attr.get_data() ) instance._tdis_file._filename = instance.simulation_data.mfdata[ ("nam", "timing", tdis_pkg) ].get_data() if verbosity_level.value >= VerbosityLevel.normal.value: print(" loading tdis package...") instance._tdis_file.load(strict) # load models try: model_recarray = instance.simulation_data.mfdata[ ("nam", "models", "models") ] models = model_recarray.get_data() except MFDataException as mfde: message = ( "Error occurred while loading model names from the " "simulation name file." ) raise MFDataException( mfdata_except=mfde, model=instance.name, package="nam", message=message, ) if models is None: return instance for item in models: # resolve model working folder and name file path, name_file = os.path.split(item[1]) model_obj = PackageContainer.model_factory(item[0][:-1].lower()) # load model if verbosity_level.value >= VerbosityLevel.normal.value: print(f" loading model {item[0].lower()}...") instance._models[item[2]] = model_obj.load( instance, instance.structure.model_struct_objs[item[0].lower()], item[2], name_file, version, exe_name, strict, path, load_only, ) # load exchange packages and dependent packages try: exchange_recarray = instance.name_file.exchanges has_exch_data = exchange_recarray.has_data() except MFDataException as mfde: message = ( "Error occurred while loading exchange names from the " "simulation name file." ) raise MFDataException( mfdata_except=mfde, model=instance.name, package="nam", message=message, ) if has_exch_data: try: exch_data = exchange_recarray.get_data() except MFDataException as mfde: message = ( "Error occurred while loading exchange names from " "the simulation name file." ) raise MFDataException( mfdata_except=mfde, model=instance.name, package="nam", message=message, ) for exgfile in exch_data: if load_only is not None and not instance._in_pkg_list( load_only, exgfile[0], exgfile[2] ): if ( instance.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print(f" skipping package {exgfile[0].lower()}...") continue # get exchange type by removing numbers from exgtype exchange_type = "".join( [char for char in exgfile[0] if not char.isdigit()] ).upper() # get exchange number for this type if exchange_type not in instance._exg_file_num: exchange_file_num = 0 instance._exg_file_num[exchange_type] = 1 else: exchange_file_num = instance._exg_file_num[exchange_type] instance._exg_file_num[exchange_type] += 1 exchange_name = f"{exchange_type}_EXG_{exchange_file_num}" # find package class the corresponds to this exchange type package_obj = instance.package_factory( exchange_type.replace("-", "").lower(), "" ) if not package_obj: message = ( "An error occurred while loading the " "simulation name file. Invalid exchange type " '"{}" specified.'.format(exchange_type) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( instance.name, "nam", "nam", "loading simulation name file", exchange_recarray.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, instance._simulation_data.debug, ) # build and load exchange package object exchange_file = package_obj( instance, exgtype=exgfile[0], exgmnamea=exgfile[2], exgmnameb=exgfile[3], filename=exgfile[1], pname=exchange_name, loading_package=True, ) if verbosity_level.value >= VerbosityLevel.normal.value: print( f" loading exchange package {exchange_file._get_pname()}..." ) exchange_file.load(strict) instance._add_package(exchange_file, exchange_file.path) instance._exchange_files[exgfile[1]] = exchange_file # load simulation packages solution_recarray = instance.simulation_data.mfdata[ ("nam", "solutiongroup", "solutiongroup") ] try: solution_group_dict = solution_recarray.get_data() except MFDataException as mfde: message = ( "Error occurred while loading solution groups from " "the simulation name file." ) raise MFDataException( mfdata_except=mfde, model=instance.name, package="nam", message=message, ) for solution_group in solution_group_dict.values(): for solution_info in solution_group: if load_only is not None and not instance._in_pkg_list( load_only, solution_info[0], solution_info[2] ): if ( instance.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( f" skipping package {solution_info[0].lower()}..." ) continue # create solution package sln_package_obj = instance.package_factory( solution_info[0][:-1].lower(), "" ) sln_package = sln_package_obj( instance, filename=solution_info[1], pname=solution_info[2], ) if verbosity_level.value >= VerbosityLevel.normal.value: print( f" loading solution package " f"{sln_package._get_pname()}..." ) sln_package.load(strict) instance.simulation_data.mfpath.set_last_accessed_path() if verify_data: instance.check() return instance
[docs] def check( self, f: Optional[Union[str, os.PathLike]] = None, verbose=True, level=1, ): """ Check model data for common errors. Parameters ---------- f : str or PathLike, optional String defining file name or file handle for summary file of check method output. If str or pathlike, a file handle is created. If None, the method does not write results to a summary file. (default is None) verbose : bool Boolean flag used to determine if check method results are written to the screen level : int Check method analysis level. If level=0, summary checks are performed. If level=1, full checks are performed. Returns ------- check list: list Python list containing simulation check results Examples -------- >>> import flopy >>> m = flopy.modflow.Modflow.load('model.nam') >>> m.check() """ # check instance for simulation-level check chk_list = [] # check models for model in self._models.values(): print(f'Checking model "{model.name}"...') chk_list.append(model.check(f, verbose, level)) print("Checking for missing simulation packages...") if self._tdis_file is None: if chk_list: chk_list[0]._add_to_summary( "Error", desc="\r No tdis package", package="model" ) print("Error: no tdis package") if len(self._solution_files) == 0: if chk_list: chk_list[0]._add_to_summary( "Error", desc="\r No solver package", package="model" ) print("Error: no solution package") return chk_list
@property def sim_path(self) -> Path: return Path(self.simulation_data.mfpath.get_sim_path()) @property def sim_package_list(self): """List of all "simulation level" packages""" package_list = [] if self._tdis_file is not None: package_list.append(self._tdis_file) for sim_package in self._solution_files.values(): package_list.append(sim_package) for sim_package in self._exchange_files.values(): package_list.append(sim_package) for sim_package in self._other_files.values(): package_list.append(sim_package) return package_list
[docs] def load_package( self, ftype, fname: Union[str, os.PathLike], pname, strict, ref_path: Union[str, os.PathLike], dict_package_name=None, parent_package=None, ): """ Load a package from a file. Parameters ---------- ftype : str the file type fname : str or PathLike the path of the file containing the package input pname : str the user-defined name for the package strict : bool strict mode when loading the file ref_path : str path to the file. uses local path if set to None dict_package_name : str package name for dictionary lookup parent_package : MFPackage parent package """ if ( ftype in self.structure.package_struct_objs and self.structure.package_struct_objs[ftype].multi_package_support ) or ( ftype in self.structure.utl_struct_objs and self.structure.utl_struct_objs[ftype].multi_package_support ): # resolve dictionary name for package if dict_package_name is not None: if parent_package is not None: dict_package_name = f"{parent_package.path[-1]}_{ftype}" else: # use dict_package_name as the base name if ftype in self._ftype_num_dict: self._ftype_num_dict[dict_package_name] += 1 else: self._ftype_num_dict[dict_package_name] = 0 dict_package_name = "{}_{}".format( dict_package_name, self._ftype_num_dict[dict_package_name], ) else: # use ftype as the base name if ftype in self._ftype_num_dict: self._ftype_num_dict[ftype] += 1 else: self._ftype_num_dict[ftype] = 0 if pname is not None: dict_package_name = pname else: dict_package_name = ( f"{ftype}_{self._ftype_num_dict[ftype]}" ) else: dict_package_name = ftype # get package object from file type package_obj = self.package_factory(ftype, "") # create package package = package_obj( self, filename=fname, pname=dict_package_name, parent_file=parent_package, loading_package=True, ) package.load(strict) self._other_files[package.filename] = package # register child package with the simulation self._add_package(package, package.path) if parent_package is not None: # register child package with the parent package parent_package._add_package(package, package.path) return package
[docs] def register_ims_package( self, solution_file: MFPackage, model_list: Union[str, List[str]] ): self.register_solution_package(solution_file, model_list)
[docs] def register_solution_package( self, solution_file: MFPackage, model_list: Union[str, List[str]] ): """ Register a solution package with the simulation. Parameters solution_file : MFPackage solution package to register model_list : list of strings list of models using the solution package to be registered """ if isinstance(model_list, str): model_list = [model_list] if ( solution_file.package_type not in mfstructure.MFStructure().flopy_dict["solution_packages"] ): comment = ( 'Parameter "solution_file" is not a valid solution file. ' 'Expected solution file, but type "{}" was given' ".".format(type(solution_file)) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( None, "solution", "", "registering solution package", "", inspect.stack()[0][3], type_, value_, traceback_, comment, self.simulation_data.debug, ) valid_model_types = mfstructure.MFStructure().flopy_dict[ "solution_packages" ][solution_file.package_type] # remove models from existing solution groups if model_list is not None: for model in model_list: md = self.get_model(model) if md is not None and ( md.model_type not in valid_model_types and "*" not in valid_model_types ): comment = ( f"Model type {md.model_type} is not a valid type " f"for solution file {solution_file.filename} solution " f"file type {solution_file.package_type}. Valid model " f"types are {valid_model_types}" ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( None, "solution", "", "registering solution package", "", inspect.stack()[0][3], type_, value_, traceback_, comment, self.simulation_data.debug, ) self._remove_from_all_solution_groups(model) # register solution package with model list in_simulation = False pkg_with_same_name = None for file in self._solution_files.values(): if file is solution_file: in_simulation = True if ( file.package_name == solution_file.package_name and file != solution_file ): pkg_with_same_name = file if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( "WARNING: solution package with name {} already exists. " "New solution package will replace old package" ".".format(file.package_name) ) self._remove_package(self._solution_files[file.filename]) del self._solution_files[file.filename] break # register solution package if not in_simulation: self._add_package( solution_file, self._get_package_path(solution_file) ) # do not allow a solution package to be registered twice with the # same simulation if not in_simulation: # create unique file/package name if solution_file.package_name is None: file_num = len(self._solution_files) - 1 solution_file.package_name = ( f"{solution_file.package_type}_{file_num}" ) if solution_file.filename in self._solution_files: solution_file.filename = MFFileMgmt.unique_file_name( solution_file.filename, self._solution_files ) # add solution package to simulation self._solution_files[solution_file.filename] = solution_file # If solution file is being replaced, replace solution filename in # solution group if pkg_with_same_name is not None and self._is_in_solution_group( pkg_with_same_name.filename, 1 ): # change existing solution group to reflect new solution file self._replace_solution_in_solution_group( pkg_with_same_name.filename, 1, solution_file.filename ) # only allow solution package to be registered to one solution group elif model_list is not None: sln_file_in_group = self._is_in_solution_group( solution_file.filename, 1 ) # add solution group to the simulation name file solution_recarray = self.name_file.solutiongroup solution_group_list = solution_recarray.get_active_key_list() if len(solution_group_list) == 0: solution_group_num = 0 else: solution_group_num = solution_group_list[-1][0] if sln_file_in_group: self._append_to_solution_group( solution_file.filename, model_list ) else: if self.name_file.mxiter.get_data(solution_group_num) is None: self.name_file.mxiter.add_transient_key(solution_group_num) # associate any models in the model list to this # simulation file version_string = mfstructure.MFStructure().get_version_string() solution_pkg = f"{solution_file.package_abbr}{version_string}" new_record = [solution_pkg, solution_file.filename] for model in model_list: new_record.append(model) try: solution_recarray.append_list_as_record( new_record, solution_group_num ) except MFDataException as mfde: message = ( "Error occurred while updating the " "simulation name file with the solution package " 'file "{}".'.format(solution_file.filename) ) raise MFDataException( mfdata_except=mfde, package="nam", message=message )
def _create_package(self, package_type, package_data): if package_data is None: return None if not isinstance(package_data, dict): message = ( "Error occurred while creating the solution package " f"{package_type}. Package data must be provided in a " f"dictionary. User provided type {type(package_data)}." ) raise MFDataException(package=package_type, message=message) # find package - only supporting utl packages for now package_obj = self.package_factory(package_type, "utl") if package_obj is not None: # determine file name if "filename" not in package_data: package_data["filename"] = f"{self.name}.{package_type}" # create package which should automatically register with the # simulation pkg = package_obj(self, **package_data) @staticmethod def _rename_package_group(group_dict, name): package_type_count = {} # first build an array to avoid key modification errors package_array = [] for package in group_dict.values(): package_array.append(package) # update package file names and count for package in package_array: if package.package_type not in package_type_count: file_name = f"{name}.{package.package_type}" package_type_count[package.package_type] = 1 else: package_type_count[package.package_type] += 1 ptc = package_type_count[package.package_type] file_name = f"{name}_{ptc}.{package.package_type}" base_filepath = os.path.split(package.filename)[0] if base_filepath != "": # include existing relative path in new file name file_name = os.path.join(base_filepath, file_name) package.filename = file_name def _rename_exchange_file(self, package, new_filename): self._exchange_files[package.filename] = package try: exchange_recarray_data = self.name_file.exchanges.get_data() except MFDataException as mfde: message = ( "An error occurred while retrieving exchange " "data from the simulation name file. The error " "occurred while registering exchange file " f'"{package.filename}".' ) raise MFDataException( mfdata_except=mfde, package=package._get_pname(), message=message, ) if exchange_recarray_data is not None: for index, exchange in zip( range(0, len(exchange_recarray_data)), exchange_recarray_data, ): if exchange[1] == package.filename: # update existing exchange exchange_recarray_data[index][1] = new_filename ex_recarray = self.name_file.exchanges try: ex_recarray.set_data(exchange_recarray_data) except MFDataException as mfde: message = ( "An error occurred while setting " "exchange data in the simulation name " "file. The error occurred while " "registering the following " "values (exgtype, filename, " f'exgmnamea, exgmnameb): "{package.exgtype} ' f"{package.filename} {package.exgmnamea}" f'{package.exgmnameb}".' ) raise MFDataException( mfdata_except=mfde, package=package._get_pname(), message=message, ) return def _set_timing_block(self, file_name): struct_root = mfstructure.MFStructure() tdis_pkg = f"tdis{struct_root.get_version_string()}" tdis_attr = getattr(self.name_file, tdis_pkg) try: tdis_attr.set_data(file_name) except MFDataException as mfde: message = ( "An error occurred while setting the tdis package " f'file name "{file_name}". The error occurred while ' "registering the tdis package with the " "simulation" ) raise MFDataException( mfdata_except=mfde, package=file_name, message=message, )
[docs] def update_package_filename(self, package, new_name): """ Updates internal arrays to be consistent with a new file name. This is for internal flopy library use only. Parameters ---------- package: MFPackage Package with new name new_name: str Package's new name """ if ( self._tdis_file is not None and package.filename == self._tdis_file.filename ): self._set_timing_block(new_name) elif package.filename in self._exchange_files: self._exchange_files[new_name] = self._exchange_files.pop( package.filename ) self._rename_exchange_file(package, new_name) elif package.filename in self._solution_files: self._solution_files[new_name] = self._solution_files.pop( package.filename ) self._update_solution_group(package.filename, new_name) else: self._other_files[new_name] = self._other_files.pop( package.filename )
[docs] def rename_all_packages(self, name): """ Rename all packages with name as prefix. Parameters ---------- name: str Prefix of package names """ if self._tdis_file is not None: self._tdis_file.filename = f"{name}.{self._tdis_file.package_type}" self._rename_package_group(self._exchange_files, name) self._rename_package_group(self._solution_files, name) self._rename_package_group(self._other_files, name) for model in self._models.values(): model.rename_all_packages(name)
[docs] def set_all_data_external( self, check_data=True, external_data_folder=None, base_name=None, binary=False, ): """Sets the simulation's list and array data to be stored externally. Parameters ---------- check_data: bool Determines if data error checking is enabled during this process. Data error checking can be slow on large datasets. external_data_folder: str or PathLike Path relative to the simulation path or model relative path (see use_model_relative_path parameter), where external data will be stored base_name: str Base file name prefix for all files binary: bool Whether file will be stored as binary """ # copy any files whose paths have changed self.simulation_data.mfpath.copy_files() # set data external for all packages in all models for model in self._models.values(): model.set_all_data_external( check_data, external_data_folder, base_name, binary, ) # set data external for solution packages for package in self._solution_files.values(): package.set_all_data_external( check_data, external_data_folder, base_name, binary, ) # set data external for other packages for package in self._other_files.values(): package.set_all_data_external( check_data, external_data_folder, base_name, binary, ) for package in self._exchange_files.values(): package.set_all_data_external( check_data, external_data_folder, base_name, binary, )
[docs] def set_all_data_internal(self, check_data=True): # set data external for all packages in all models for model in self._models.values(): model.set_all_data_internal(check_data) # set data external for solution packages for package in self._solution_files.values(): package.set_all_data_internal(check_data) # set data external for other packages for package in self._other_files.values(): package.set_all_data_internal(check_data) # set data external for exchange packages for package in self._exchange_files.values(): package.set_all_data_internal(check_data)
[docs] def write_simulation( self, ext_file_action=ExtFileAction.copy_relative_paths, silent=False ): """ Write the simulation to files. Parameters ext_file_action : ExtFileAction Defines what to do with external files when the simulation path has changed. Defaults to copy_relative_paths which copies only files with relative paths, leaving files defined by absolute paths fixed. silent : bool Writes out the simulation in silent mode (verbosity_level = 0) """ sim_data = self.simulation_data if not sim_data.max_columns_user_set: # search for dis packages for model in self._models.values(): dis = model.get_package("dis", type_only=True) if dis is not None and hasattr(dis, "ncol"): sim_data.max_columns_of_data = dis.ncol.get_data() sim_data.max_columns_user_set = False sim_data.max_columns_auto_set = True saved_verb_lvl = self.simulation_data.verbosity_level if silent: self.simulation_data.verbosity_level = VerbosityLevel.quiet # write simulation name file if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print("writing simulation...") print(" writing simulation name file...") self.name_file.write(ext_file_action=ext_file_action) # write TDIS file if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print(" writing simulation tdis package...") self._tdis_file.write(ext_file_action=ext_file_action) # write solution files for solution_file in self._solution_files.values(): if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( f" writing solution package " f"{solution_file._get_pname()}..." ) solution_file.write(ext_file_action=ext_file_action) # write exchange files for exchange_file in self._exchange_files.values(): exchange_file.write() # write other packages for pp in self._other_files.values(): if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print(f" writing package {pp._get_pname()}...") pp.write(ext_file_action=ext_file_action) # FIX: model working folder should be model name file folder # write models for model in self._models.values(): if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print(f" writing model {model.name}...") model.write(ext_file_action=ext_file_action) self.simulation_data.mfpath.set_last_accessed_path() if silent: self.simulation_data.verbosity_level = saved_verb_lvl
[docs] def set_sim_path(self, path: Union[str, os.PathLike]): """Return a list of output data keys. Parameters ---------- path : str Relative or absolute path to simulation root folder. """ # set all data internal self.set_all_data_internal() # set simulation path self.simulation_data.mfpath.set_sim_path(path, True) if not os.path.exists(path): # create new simulation folder os.makedirs(path)
[docs] def run_simulation( self, silent=None, pause=False, report=False, processors=None, normal_msg="normal termination", use_async=False, cargs=None, custom_print=None, ): """ Run the simulation. Parameters ---------- silent: bool Run in silent mode pause: bool Pause at end of run report: bool Save stdout lines to a list (buff) processors: int Number of processors. Parallel simulations are only supported for MODFLOW 6 simulations. (default is None) normal_msg: str or list Normal termination message used to determine if the run terminated normally. More than one message can be provided using a list. (default is 'normal termination') use_async : bool Asynchronously read model stdout and report with timestamps. good for models that take long time to run. not good for models that run really fast cargs : str or list of strings Additional command line arguments to pass to the executable. default is None custom_print: callable Optional callbale for printing. It will replace the builtin print function. This is useful for shorter prints or integration into other systems such as GUIs. default is None, i.e. use the builtion print Returns -------- success : bool buff : list of lines of stdout """ if silent is None: if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): silent = False else: silent = True return run_model( self.exe_name, None, self.simulation_data.mfpath.get_sim_path(), silent=silent, pause=pause, report=report, processors=processors, normal_msg=normal_msg, use_async=use_async, cargs=cargs, custom_print=custom_print, )
[docs] def delete_output_files(self): """Deletes simulation output files.""" output_req = binaryfile_utils.MFOutputRequester output_file_keys = output_req.getkeys( self.simulation_data.mfdata, self.simulation_data.mfpath, False ) for path in output_file_keys.binarypathdict.values(): if os.path.isfile(path): os.remove(path)
[docs] def remove_package(self, package_name): """ Removes package from the simulation. `package_name` can be the package's name, type, or package object to be removed from the model. Parameters ---------- package_name : str Name of package to be removed """ if isinstance(package_name, MFPackage): packages = [package_name] else: packages = self.get_package(package_name) if not isinstance(packages, list): packages = [packages] for package in packages: if ( self._tdis_file is not None and package.path == self._tdis_file.path ): self._tdis_file = None if package.filename in self._exchange_files: del self._exchange_files[package.filename] if package.filename in self._solution_files: del self._solution_files[package.filename] self._update_solution_group(package.filename) if package.filename in self._other_files: del self._other_files[package.filename] self._remove_package(package) # if this is a package referenced from a filerecord, remove filerecord # from name file file_record_name = f"_{package.package_type}_filerecord" if hasattr(self.name_file, file_record_name): file_record = getattr(self.name_file, file_record_name) if isinstance(file_record, mfdata.MFData): file_record.set_data(None) if hasattr(self.name_file, package.package_type): child_pkgs = getattr(self.name_file, package.package_type) child_pkgs._remove_packages(package.filename, True)
@property def model_dict(self): """ Return a dictionary of models associated with this simulation. Returns -------- model dict : dict dictionary of models """ return self._models.copy()
[docs] def get_model(self, model_name=None): """ Returns the models in the simulation with a given model name, name file name, or model type. Parameters ---------- model_name : str Name of the model to get. Passing in None or an empty list will get the first model. Returns -------- model : MFModel """ if len(self._models) == 0: return None if model_name is None: for model in self._models.values(): return model if model_name in self._models: return self._models[model_name] # do case-insensitive lookup for name, model in self._models.items(): if model_name.lower() == name.lower(): return model return None
[docs] def get_exchange_file(self, filename): """ Get a specified exchange file. Parameters ---------- filename : str Name of exchange file to get Returns -------- exchange package : MFPackage """ if filename in self._exchange_files: return self._exchange_files[filename] else: excpt_str = f'Exchange file "{filename}" can not be found.' raise FlopyException(excpt_str)
[docs] def get_file(self, filename): """ Get a specified file. Parameters ---------- filename : str Name of mover file to get Returns -------- mover package : MFPackage """ if filename in self._other_files: return self._other_files[filename] else: excpt_str = f'file "{filename}" can not be found.' raise FlopyException(excpt_str)
[docs] def remove_exchange_file(self, package): """ Removes the exchange file "package". This is for internal flopy library use only. Parameters ---------- package: MFPackage Exchange package to be removed """ self._exchange_files[package.filename] = package try: exchange_recarray_data = self.name_file.exchanges.get_data() except MFDataException as mfde: message = ( "An error occurred while retrieving exchange " "data from the simulation name file. The error " "occurred while registering exchange file " f'"{package.filename}".' ) raise MFDataException( mfdata_except=mfde, package=package._get_pname(), message=message, ) remove_indices = [] if exchange_recarray_data is not None: for index, exchange in zip( range(0, len(exchange_recarray_data)), exchange_recarray_data, ): if ( package.filename is not None and exchange[1] == package.filename ): remove_indices.append(index) if len(remove_indices) > 0: self.name_file.exchanges.set_data( np.delete(exchange_recarray_data, remove_indices) )
[docs] def register_exchange_file(self, package): """ Register an exchange package file with the simulation. This is a call-back method made from the package and should not be called directly. Parameters ---------- package : MFPackage Exchange package object to register """ if package.filename not in self._exchange_files: exgtype = package.exgtype exgmnamea = package.exgmnamea exgmnameb = package.exgmnameb if exgtype is None or exgmnamea is None or exgmnameb is None: excpt_str = ( "Exchange packages require that exgtype, " "exgmnamea, and exgmnameb are specified." ) raise FlopyException(excpt_str) self._exchange_files[package.filename] = package try: exchange_recarray_data = self.name_file.exchanges.get_data() except MFDataException as mfde: message = ( "An error occurred while retrieving exchange " "data from the simulation name file. The error " "occurred while registering exchange file " f'"{package.filename}".' ) raise MFDataException( mfdata_except=mfde, package=package._get_pname(), message=message, ) if exchange_recarray_data is not None: for index, exchange in zip( range(0, len(exchange_recarray_data)), exchange_recarray_data, ): if exchange[1] == package.filename: # update existing exchange exchange_recarray_data[index][0] = exgtype exchange_recarray_data[index][2] = exgmnamea exchange_recarray_data[index][3] = exgmnameb ex_recarray = self.name_file.exchanges try: ex_recarray.set_data(exchange_recarray_data) except MFDataException as mfde: message = ( "An error occurred while setting " "exchange data in the simulation name " "file. The error occurred while " "registering the following " "values (exgtype, filename, " f'exgmnamea, exgmnameb): "{exgtype} ' f"{package.filename} {exgmnamea}" f'{exgmnameb}".' ) raise MFDataException( mfdata_except=mfde, package=package._get_pname(), message=message, ) return try: # add new exchange self.name_file.exchanges.append_data( [(exgtype, package.filename, exgmnamea, exgmnameb)] ) except MFDataException as mfde: message = ( "An error occurred while setting exchange data " "in the simulation name file. The error occurred " "while registering the following values (exgtype, " f'filename, exgmnamea, exgmnameb): "{exgtype} ' f'{package.filename} {exgmnamea} {exgmnameb}".' ) raise MFDataException( mfdata_except=mfde, package=package._get_pname(), message=message, ) if ( package.dimensions is None or package.dimensions.model_dim is None ): # resolve exchange package dimensions object package.dimensions = package.create_package_dimensions()
def _remove_package_by_type(self, package): pname = None if package.package_name is not None: pname = package.package_name.lower() if ( package.package_type.lower() == "tdis" and self._tdis_file is not None and self._tdis_file in self._packagelist ): # tdis package already exists. there can be only one tdis # package. remove existing tdis package if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( "WARNING: tdis package already exists. Replacing " "existing tdis package." ) self._remove_package(self._tdis_file) elif ( package.package_type.lower() in mfstructure.MFStructure().flopy_dict["solution_packages"] and pname in self.package_name_dict ): if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( "WARNING: Package with name " f"{package.package_name.lower()} already exists. " "Replacing existing package." ) self._remove_package(self.package_name_dict[pname]) else: if ( package.filename in self._other_files and self._other_files[package.filename] in self._packagelist ): # other package with same file name already exists. remove old # package if ( self.simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( f"WARNING: package with name {pname} already exists. " "Replacing existing package." ) self._remove_package(self._other_files[package.filename]) del self._other_files[package.filename]
[docs] def register_package( self, package, add_to_package_list=True, set_package_name=True, set_package_filename=True, ): """ Register a package file with the simulation. This is a call-back method made from the package and should not be called directly. Parameters ---------- package : MFPackage Package to register add_to_package_list : bool Add package to lookup list set_package_name : bool Produce a package name for this package set_package_filename : bool Produce a filename for this package Returns -------- (path : tuple, package structure : MFPackageStructure) """ if set_package_filename: # set initial package filename base_name = os.path.basename(os.path.normpath(self.name)) package._filename = f"{base_name}.{package.package_type}" package.container_type = [PackageContainerType.simulation] path = self._get_package_path(package) if add_to_package_list and package.package_type.lower != "nam": if ( package.package_type.lower() not in mfstructure.MFStructure().flopy_dict[ "solution_packages" ] ): # all but solution packages get added here. solution packages # are added during solution package registration self._remove_package_by_type(package) self._add_package(package, path) sln_dict = mfstructure.MFStructure().flopy_dict["solution_packages"] if package.package_type.lower() == "nam": if not package.internal_package: excpt_str = ( "Unable to register nam file. Do not create your own nam " "files. Nam files are automatically created and managed " "for you by FloPy." ) print(excpt_str) raise FlopyException(excpt_str) return path, self.structure.name_file_struct_obj elif package.package_type.lower() == "tdis": self._tdis_file = package self._set_timing_block(package.quoted_filename) return ( path, self.structure.package_struct_objs[ package.package_type.lower() ], ) elif package.package_type.lower() in sln_dict: supported_packages = sln_dict[package.package_type.lower()] # default behavior is to register the solution package with the # first unregistered model unregistered_models = [] for model_name, model in self._models.items(): model_registered = self._is_in_solution_group( model_name, 2, True ) if not model_registered and ( model.model_type in supported_packages or "*" in supported_packages ): unregistered_models.append(model_name) if unregistered_models: self.register_solution_package(package, unregistered_models) else: self.register_solution_package(package, None) return ( path, self.structure.package_struct_objs[ package.package_type.lower() ], ) else: if package.filename not in self._other_files: self._other_files[package.filename] = package else: # auto generate a unique file name and register it file_name = MFFileMgmt.unique_file_name( package.filename, self._other_files ) package.filename = file_name self._other_files[file_name] = package # If this package is declared in the namefile options block, # update namefile file_record = f"_{package.package_type}_filerecord" if hasattr(self.name_file, file_record): fr_obj = getattr(self.name_file, file_record) fr_obj.set_data(package.filename) if package.package_type.lower() in self.structure.package_struct_objs: return ( path, self.structure.package_struct_objs[ package.package_type.lower() ], ) elif package.package_type.lower() in self.structure.utl_struct_objs: return ( path, self.structure.utl_struct_objs[package.package_type.lower()], ) else: excpt_str = ( 'Invalid package type "{}". Unable to register ' "package.".format(package.package_type) ) print(excpt_str) raise FlopyException(excpt_str)
[docs] def rename_model_namefile(self, model, new_namefile): """ Rename a model's namefile. For internal flopy library use only. Parameters ---------- model : MFModel Model object whose namefile to rename new_namefile : str Name of the new namefile """ # update simulation name file models = self.name_file.models.get_data() for mdl in models: path, name_file_name = os.path.split(mdl[1]) if name_file_name == model.name_file.filename: mdl[1] = os.path.join(path, new_namefile) self.name_file.models.set_data(models)
[docs] def register_model(self, model, model_type, model_name, model_namefile): """ Add a model to the simulation. This is a call-back method made from the package and should not be called directly. Parameters ---------- model : MFModel Model object to add to simulation sln_group : str Solution group of model Returns -------- model_structure_object : MFModelStructure """ # get model structure from model type if model_type not in self.structure.model_struct_objs: message = f'Invalid model type: "{model_type}".' type_, value_, traceback_ = sys.exc_info() raise MFDataException( model.name, "", model.name, "registering model", "sim", inspect.stack()[0][3], type_, value_, traceback_, message, self.simulation_data.debug, ) # add model self._models[model_name] = model # update simulation name file self.name_file.models.append_list_as_record( [model_type, model_namefile, model_name] ) if len(self._solution_files) > 0: # register model with first solution file found first_solution_key = next(iter(self._solution_files)) self.register_solution_package( self._solution_files[first_solution_key], model_name ) return self.structure.model_struct_objs[model_type]
[docs] def get_solution_package(self, key): """ Get the solution package with the specified `key`. Parameters ---------- key : str solution package file name Returns -------- solution_package : MFPackage """ if key in self._solution_files: return self._solution_files[key] return None
[docs] def remove_model(self, model_name): """ Remove model with name `model_name` from the simulation Parameters ---------- model_name : str Model name to remove from simulation """ # remove model del self._models[model_name] # remove from solution group block self._remove_from_all_solution_groups(model_name) # remove from models block models_recarray = self.name_file.models.get_data() if models_recarray is not None: new_records = [] for record in models_recarray: if len(record) <= 2 or record[2] != model_name: new_records.append(tuple(record)) self.name_file.models.set_data(new_records) # remove from exchanges block exch_recarray = self.name_file.exchanges.get_data() if exch_recarray is not None: new_records = [] for record in exch_recarray: model_in_record = False if len(record) > 2: for item in list(record)[2:]: if item == model_name: model_in_record = True if not model_in_record: new_records.append(tuple(record)) if len(new_records) == 0: new_records = None self.name_file.exchanges.set_data(new_records)
[docs] def is_valid(self): """ Checks the validity of the solution and all of its models and packages. Returns true if the solution is valid, false if it is not. Returns -------- valid : bool Whether this is a valid simulation """ # name file valid if not self.name_file.is_valid(): return False # tdis file valid if not self._tdis_file.is_valid(): return False # exchanges valid for exchange in self._exchange_files: if not exchange.is_valid(): return False # solution files valid for solution_file in self._solution_files.values(): if not solution_file.is_valid(): return False # a model exists if not self._models: return False # models valid for key in self._models: if not self._models[key].is_valid(): return False # each model has a solution file return True
@staticmethod def _resolve_verbosity_level(verbosity_level): if verbosity_level == 0: return VerbosityLevel.quiet elif verbosity_level == 1: return VerbosityLevel.normal elif verbosity_level == 2: return VerbosityLevel.verbose else: return verbosity_level @staticmethod def _get_package_path(package): if package.parent_file is not None: return (package.parent_file.path) + (package.package_type,) else: return (package.package_type,) def _update_solution_group(self, solution_file, new_name=None): solution_recarray = self.name_file.solutiongroup for solution_group_num in solution_recarray.get_active_key_list(): try: rec_array = solution_recarray.get_data(solution_group_num[0]) except MFDataException as mfde: message = ( "An error occurred while getting solution group" '"{}" from the simulation name file' ".".format(solution_group_num[0]) ) raise MFDataException( mfdata_except=mfde, package="nam", message=message ) new_array = [] for record in rec_array: if record.slnfname == solution_file: if new_name is not None: record.slnfname = new_name new_array.append(tuple(record)) else: continue else: new_array.append(record) if not new_array: new_array = None solution_recarray.set_data(new_array, solution_group_num[0]) def _remove_from_all_solution_groups(self, modelname): solution_recarray = self.name_file.solutiongroup for solution_group_num in solution_recarray.get_active_key_list(): try: rec_array = solution_recarray.get_data(solution_group_num[0]) except MFDataException as mfde: message = ( "An error occurred while getting solution group" '"{}" from the simulation name file' ".".format(solution_group_num[0]) ) raise MFDataException( mfdata_except=mfde, package="nam", message=message ) new_array = ["no_check"] for index, record in enumerate(rec_array): new_record = [] new_record.append(record[0]) new_record.append(record[1]) for item in list(record)[2:]: if item is not None and item.lower() != modelname.lower(): new_record.append(item) new_array.append(tuple(new_record)) solution_recarray.set_data(new_array, solution_group_num[0]) def _append_to_solution_group(self, solution_file, new_models): # clear models out of solution groups if new_models is not None: for model in new_models: self._remove_from_all_solution_groups(model) # append models to solution_file solution_recarray = self.name_file.solutiongroup for solution_group_num in solution_recarray.get_active_key_list(): try: rec_array = solution_recarray.get_data(solution_group_num[0]) except MFDataException as mfde: message = ( "An error occurred while getting solution group" '"{}" from the simulation name file' ".".format(solution_group_num[0]) ) raise MFDataException( mfdata_except=mfde, package="nam", message=message ) new_array = [] for index, record in enumerate(rec_array): new_record = [] rec_model_dict = {} for index, item in enumerate(record): if ( record[1] == solution_file or item not in new_models ) and item is not None: new_record.append(item) if index > 1 and item is not None: rec_model_dict[item.lower()] = 1 if record[1] == solution_file: for model in new_models: if model.lower() not in rec_model_dict: new_record.append(model) new_array.append(tuple(new_record)) solution_recarray.set_data(new_array, solution_group_num[0]) def _replace_solution_in_solution_group(self, item, index, new_item): solution_recarray = self.name_file.solutiongroup for solution_group_num in solution_recarray.get_active_key_list(): try: rec_array = solution_recarray.get_data(solution_group_num[0]) except MFDataException as mfde: message = ( "An error occurred while getting solution group" '"{}" from the simulation name file. The error ' 'occurred while replacing solution file "{}" with "{}"' 'at index "{}"'.format( solution_group_num[0], item, new_item, index ) ) raise MFDataException( mfdata_except=mfde, package="nam", message=message ) if rec_array is not None: for rec_item in rec_array: if rec_item[index] == item: rec_item[index] = new_item def _is_in_solution_group(self, item, index, any_idx_after=False): solution_recarray = self.name_file.solutiongroup for solution_group_num in solution_recarray.get_active_key_list(): try: rec_array = solution_recarray.get_data(solution_group_num[0]) except MFDataException as mfde: message = ( "An error occurred while getting solution group" '"{}" from the simulation name file. The error ' 'occurred while verifying file "{}" at index "{}" ' "is in the simulation name file" ".".format(solution_group_num[0], item, index) ) raise MFDataException( mfdata_except=mfde, package="nam", message=message ) if rec_array is not None: for rec_item in rec_array: if any_idx_after: for idx in range(index, len(rec_item)): if rec_item[idx] == item: return True else: if rec_item[index] == item: return True return False
[docs] def plot( self, model_list: Optional[Union[str, List[str]]] = None, SelPackList=None, **kwargs, ): """ Plot simulation or models. Method to plot a whole simulation or a series of models that are part of a simulation. Parameters ---------- model_list: list, optional List of model names to plot, if none all models will be plotted SelPackList: list, optional List of package names to plot, if none all packages will be plotted kwargs: filename_base : str Base file name that will be used to automatically generate file names for output image files. Plots will be exported as image files if file_name_base is not None. (default is None) file_extension : str Valid matplotlib.pyplot file extension for savefig(). Only used if filename_base is not None. (default is 'png') mflay : int MODFLOW zero-based layer number to return. If None, then all layers will be included. (default is None) kper : int MODFLOW zero-based stress period number to return. (default is zero) key : str MFList dictionary key. (default is None) Returns -------- axes: (list) matplotlib.pyplot.axes objects """ from ...plot.plotutil import PlotUtilities axes = PlotUtilities._plot_simulation_helper( self, model_list=model_list, SelPackList=SelPackList, **kwargs ) return axes