Source code for flopy.mf6.mfbase

import glob
import importlib
import inspect, sys, traceback
import os, copy
from collections import OrderedDict
from collections.abc import Iterable
from shutil import copyfile
from enum import Enum


# internal handled exceptions
[docs]class MFInvalidTransientBlockHeaderException(Exception): """ Exception occurs when parsing a transient block header """ def __init__(self, error): Exception.__init__( self, "MFInvalidTransientBlockHeaderException: {}".format(error) )
[docs]class ReadAsArraysException(Exception): """ Exception occurs when loading ReadAsArrays package as non-ReadAsArrays package. """ def __init__(self, error): Exception.__init__(self, "ReadAsArraysException: {}".format(error))
# external exceptions for users
[docs]class FlopyException(Exception): """ General FloPy exception """ def __init__(self, error, location=""): self.message = error Exception.__init__( self, "FlopyException: {} ({})".format(error, location) )
[docs]class StructException(Exception): """ Exception with the package file structure """ def __init__(self, error, location): self.message = error Exception.__init__( self, "StructException: {} ({})".format(error, location) )
[docs]class MFDataException(Exception): """ Exception with MODFLOW data. Exception includes detailed error information. """ def __init__( self, model=None, package=None, path=None, current_process=None, data_element=None, method_caught_in=None, org_type=None, org_value=None, org_traceback=None, message=None, debug=None, mfdata_except=None, ): if mfdata_except is not None and isinstance( mfdata_except, MFDataException ): # copy constructor - copying values from original exception self.model = mfdata_except.model self.package = mfdata_except.package self.current_process = mfdata_except.current_process self.data_element = mfdata_except.data_element self.path = mfdata_except.path self.messages = mfdata_except.messages self.debug = mfdata_except.debug self.method_caught_in = mfdata_except.method_caught_in self.org_type = mfdata_except.org_type self.org_value = mfdata_except.org_value self.org_traceback = mfdata_except.org_traceback self.org_tb_string = mfdata_except.org_tb_string else: self.messages = [] if mfdata_except is not None and ( isinstance(mfdata_except, StructException) or isinstance(mfdata_except, FlopyException) ): self.messages.append(mfdata_except.message) self.model = None self.package = None self.current_process = None self.data_element = None self.path = None self.debug = False self.method_caught_in = None self.org_type = None self.org_value = None self.org_traceback = None self.org_tb_string = None # override/assign any values that are not none if model is not None: self.model = model if package is not None: self.package = package if current_process is not None: self.current_process = current_process if data_element is not None: self.data_element = data_element if path is not None: self.path = path if message is not None: self.messages.append(message) if debug is not None: self.debug = debug if method_caught_in is not None: self.method_caught_in = method_caught_in if org_type is not None: self.org_type = org_type if org_value is not None: self.org_value = org_value if org_traceback is not None: self.org_traceback = org_traceback self.org_tb_string = traceback.format_exception( self.org_type, self.org_value, self.org_traceback ) # build error string error_message_0 = "An error occurred in " if self.data_element is not None and self.data_element != "": error_message_1 = 'data element "{}" '.format(self.data_element) else: error_message_1 = "" if self.model is not None and self.model != "": error_message_2 = 'model "{}" '.format(self.model) else: error_message_2 = "" error_message_3 = 'package "{}".'.format(self.package) error_message_4 = ( ' The error occurred while {} in the "{}" method' ".".format(self.current_process, self.method_caught_in) ) if len(self.messages) > 0: error_message_5 = "\nAdditional Information:\n" for index, message in enumerate(self.messages): error_message_5 = "{}({}) {}\n".format( error_message_5, index + 1, message ) else: error_message_5 = "" error_message = "{}{}{}{}{}{}".format( error_message_0, error_message_1, error_message_2, error_message_3, error_message_4, error_message_5, ) Exception.__init__(self, error_message)
[docs]class VerbosityLevel(Enum): """Determines how much information FloPy writes to the console""" quiet = 1 normal = 2 verbose = 3
[docs]class PackageContainerType(Enum): """Determines whether a package container is a simulation, model, or package.""" simulation = 1 model = 2 package = 3
[docs]class ExtFileAction(Enum): """Defines what to do with external files when the simulation or model's path change.""" copy_all = 1 copy_none = 2 copy_relative_paths = 3
[docs]class MFFilePath: """Class that stores a single file path along with the associated model name.""" def __init__(self, file_path, model_name): self.file_path = file_path self.model_name = {model_name: 0}
[docs] def isabs(self): return os.path.isabs(self.file_path)
[docs]class MFFileMgmt: """ Class containing MODFLOW path data Parameters ---------- path : str Path on disk to the simulation Attributes ---------- model_relative_path : OrderedDict Dictionary of relative paths to each model folder """ def __init__(self, path, mfsim=None): self.simulation = mfsim self._sim_path = "" self.set_sim_path(path, True) # keys:fully pathed filenames, vals:FilePath instances self.existing_file_dict = {} # keys:filenames,vals:instance name self.model_relative_path = OrderedDict() self._last_loaded_sim_path = None self._last_loaded_model_relative_path = OrderedDict()
[docs] def copy_files(self, copy_relative_only=True): """Copy files external to updated path. Parameters ---------- copy_relative_only : bool Only copy files with relative paths. """ num_files_copied = 0 if self._last_loaded_sim_path is not None: for mffile_path in self.existing_file_dict.values(): # resolve previous simulation path. if mf6 changes # so that paths are relative to the model folder, then # this call should have "model_name" instead of "None" path_old = self.resolve_path(mffile_path, None, True) if os.path.isfile(path_old) and ( not mffile_path.isabs() or not copy_relative_only ): # change "None" to "model_name" as above if mf6 # supports model relative paths path_new = self.resolve_path(mffile_path, None) if path_old != path_new: new_folders = os.path.split(path_new)[0] if not os.path.exists(new_folders): os.makedirs(new_folders) try: copyfile(path_old, path_new) except: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "appending data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ) num_files_copied += 1 return num_files_copied
[docs] def get_updated_path( self, external_file_path, model_name, ext_file_action ): """For internal FloPy use, not intended for end user.""" return external_file_path
def _build_relative_path(self, model_name): old_abs_path = self.resolve_path("", model_name, True) current_abs_path = self.resolve_path("", model_name, False) return os.path.relpath(old_abs_path, current_abs_path)
[docs] def strip_model_relative_path(self, model_name, path): """Strip out the model relative path part of `path`. For internal FloPy use, not intended for end user.""" if model_name in self.model_relative_path: model_rel_path = self.model_relative_path[model_name] new_path = None while path: path, leaf = os.path.split(path) if leaf != model_rel_path: if new_path: new_path = os.path.join(leaf, new_path) else: new_path = leaf return new_path
[docs] @staticmethod def unique_file_name(file_name, lookup): """Generate a unique file name. For internal FloPy use, not intended for end user.""" num = 0 while MFFileMgmt._build_file(file_name, num) in lookup: num += 1 return MFFileMgmt._build_file(file_name, num)
@staticmethod def _build_file(file_name, num): file, ext = os.path.splitext(file_name) if ext: return "{}_{}{}".format(file, num, ext) else: return "{}_{}".format(file, num)
[docs] @staticmethod def string_to_file_path(fp_string): """Interpret string as a file path. For internal FloPy use, not intended for end user.""" file_delimiters = ["/", "\\"] new_string = fp_string for delimiter in file_delimiters: arr_string = new_string.split(delimiter) if len(arr_string) > 1: if os.path.isabs(fp_string): new_string = "{}{}{}".format( arr_string[0], delimiter, arr_string[1] ) else: new_string = os.path.join(arr_string[0], arr_string[1]) if len(arr_string) > 2: for path_piece in arr_string[2:]: new_string = os.path.join(new_string, path_piece) return new_string
[docs] def set_last_accessed_path(self): """Set the last accessed simulation path to the current simulation path. For internal FloPy use, not intended for end user.""" self._last_loaded_sim_path = self._sim_path self.set_last_accessed_model_path()
[docs] def set_last_accessed_model_path(self): """Set the last accessed model path to the current model path. For internal FloPy use, not intended for end user.""" for key, item in self.model_relative_path.items(): self._last_loaded_model_relative_path[key] = copy.deepcopy(item)
[docs] def get_model_path(self, key, last_loaded_path=False): """Returns the model working path for the model `key`. Parameters ---------- key : str Model name whose path flopy will retrieve last_loaded_path : bool Get the last path loaded by FloPy which may not be the most recent path. Returns ------- model path : str """ if last_loaded_path: return os.path.join( self._last_loaded_sim_path, self._last_loaded_model_relative_path[key], ) else: if key in self.model_relative_path: return os.path.join( self._sim_path, self.model_relative_path[key] ) else: return self._sim_path
[docs] def get_sim_path(self, last_loaded_path=False): """Get the simulation path.""" if last_loaded_path: return self._last_loaded_sim_path else: return self._sim_path
[docs] def add_ext_file(self, file_path, model_name): """Add an external file to the path list. For internal FloPy use, not intended for end user.""" if file_path in self.existing_file_dict: if model_name not in self.existing_file_dict[file_path].model_name: self.existing_file_dict[file_path].model_name[model_name] = 0 else: new_file_path = MFFilePath(file_path, model_name) self.existing_file_dict[file_path] = new_file_path
[docs] def set_sim_path(self, path, internal_use=False): """ Set the file path to the simulation files. Internal use only, call MFSimulation's set_sim_path method instead. Parameters ---------- path : str Full path or relative path from working directory to simulation folder Returns ------- Examples -------- self.simulation_data.mfdata.set_sim_path('sim_folder') """ if not internal_use: print( "WARNING: MFFileMgt's set_sim_path has been deprecated. " "Please use MFSimulation's set_sim_path in the future." ) if self.simulation is not None: self.simulation.set_sim_path(path) return # recalculate paths for everything # resolve path type path = self.string_to_file_path(path) if os.path.isabs(path): self._sim_path = path else: # assume path is relative to working directory self._sim_path = os.path.join(os.getcwd(), path)
[docs] def resolve_path( self, path, model_name, last_loaded_path=False, move_abs_paths=False ): """Resolve a simulation or model path. For internal FloPy use, not intended for end user.""" if isinstance(path, MFFilePath): file_path = path.file_path else: file_path = path if os.path.isabs(file_path): # path is an absolute path if move_abs_paths: return self.get_sim_path(last_loaded_path) else: return file_path else: # path is a relative path return os.path.join(self.get_sim_path(last_loaded_path), file_path)
[docs]class PackageContainer: """ Base class for any class containing packages. Parameters ---------- simulation_data : SimulationData The simulation's SimulationData object name : str Name of the package container object Attributes ---------- package_type_dict : dictionary Dictionary of packages by package type package_name_dict : dictionary Dictionary of packages by package name package_key_dict : dictionary Dictionary of packages by package key """ def __init__(self, simulation_data, name): self.type = "PackageContainer" self.simulation_data = simulation_data self.name = name self._packagelist = [] self.package_type_dict = {} self.package_name_dict = {} self.package_key_dict = {}
[docs] @staticmethod def package_factory(package_type, model_type): """Static method that returns the appropriate package type object based on the package_type and model_type strings. For internal FloPy use only, not intended for end users. Parameters ---------- package_type : str Type of package to create model_type : str Type of model that package is a part of Returns ------- package : MFPackage subclass """ package_abbr = "{}{}".format(model_type, package_type) package_utl_abbr = "utl{}".format(package_type) package_list = [] # iterate through python files package_file_paths = PackageContainer.get_package_file_paths() for package_file_path in package_file_paths: module = PackageContainer.get_module(package_file_path) if module is not None: # iterate imported items for item in dir(module): value = PackageContainer.get_module_val( module, item, "package_abbr" ) if value is not None: abbr = value.package_abbr if package_type is None: # don't store packages "group" classes if len(abbr) <= 8 or abbr[-8:] != "packages": package_list.append(value) else: # check package type if ( value.package_abbr == package_abbr or value.package_abbr == package_utl_abbr ): return value if package_type is None: return package_list else: return None
[docs] @staticmethod def model_factory(model_type): """Static method that returns the appropriate model type object based on the model_type string. For internal FloPy use only, not intended for end users. Parameters ---------- model_type : str Type of model that package is a part of Returns ------- model : MFModel subclass """ package_file_paths = PackageContainer.get_package_file_paths() for package_file_path in package_file_paths: module = PackageContainer.get_module(package_file_path) if module is not None: # iterate imported items for item in dir(module): value = PackageContainer.get_module_val( module, item, "model_type" ) if value is not None and value.model_type == model_type: return value return None
[docs] @staticmethod def get_module_val(module, item, attrb): """Static method that returns a python class module value. For internal FloPy use only, not intended for end users.""" value = getattr(module, item) # verify this is a class if ( not value or not inspect.isclass(value) or not hasattr(value, attrb) ): return None return value
[docs] @staticmethod def get_module(package_file_path): """Static method that returns the python module library. For internal FloPy use only, not intended for end users.""" package_file_name = os.path.basename(package_file_path) module_path = os.path.splitext(package_file_name)[0] module_name = "{}{}{}".format( "Modflow", module_path[2].upper(), module_path[3:] ) if module_name.startswith("__"): return None # import return importlib.import_module( "flopy.mf6.modflow.{}".format(module_path) )
[docs] @staticmethod def get_package_file_paths(): """Static method that gets the paths of all the FloPy python package files. For internal FloPy use only, not intended for end users. """ base_path = os.path.split(os.path.realpath(__file__))[0] package_path = os.path.join(base_path, "modflow") return glob.glob(os.path.join(package_path, "*.py"))
@property def package_dict(self): """Returns a copy of the package name dictionary.""" return self.package_name_dict.copy() @property def package_names(self): """Returns a list of package names.""" return list(self.package_name_dict.keys()) def _add_package(self, package, path): # put in packages list and update lookup dictionaries self._packagelist.append(package) if package.package_name is not None: self.package_name_dict[package.package_name.lower()] = package self.package_key_dict[path[-1].lower()] = package if package.package_type not in self.package_type_dict: self.package_type_dict[package.package_type.lower()] = [] self.package_type_dict[package.package_type.lower()].append(package) def _remove_package(self, package): self._packagelist.remove(package) if ( package.package_name is not None and package.package_name.lower() in self.package_name_dict ): del self.package_name_dict[package.package_name.lower()] del self.package_key_dict[package.path[-1].lower()] package_list = self.package_type_dict[package.package_type.lower()] package_list.remove(package) if len(package_list) == 0: del self.package_type_dict[package.package_type.lower()] # collect keys of items to be removed from main dictionary items_to_remove = [] for key in self.simulation_data.mfdata: is_subkey = True for pitem, ditem in zip(package.path, key): if pitem != ditem: is_subkey = False break if is_subkey: items_to_remove.append(key) # remove items from main dictionary for key in items_to_remove: del self.simulation_data.mfdata[key]
[docs] def get_package(self, name=None): """ Finds a package by package name, package key, package type, or partial package name. returns either a single package, a list of packages, or None. Parameters ---------- name : str Name of the package, 'RIV', 'LPF', etc. Returns ------- pp : Package object """ if name is None: return self._packagelist[:] # search for full package name if name.lower() in self.package_name_dict: return self.package_name_dict[name.lower()] # search for package type if name.lower() in self.package_type_dict: if len(self.package_type_dict[name.lower()]) == 0: return None elif len(self.package_type_dict[name.lower()]) == 1: return self.package_type_dict[name.lower()][0] else: return self.package_type_dict[name.lower()] # search for package key if name.lower() in self.package_key_dict: return self.package_key_dict[name.lower()] # search for partial and case-insensitive package name for pp in self._packagelist: if pp.package_name is not None: # get first package of the type requested package_name = pp.package_name.lower() if len(package_name) > len(name): package_name = package_name[0 : len(name)] if package_name.lower() == name.lower(): return pp return None
[docs] def register_package(self, package): """Base method for registering a package. Should be overridden.""" path = (package.package_name,) return (path, None)
@staticmethod def _load_only_dict(load_only): if load_only is None: return None if isinstance(load_only, dict): return load_only if not isinstance(load_only, Iterable): raise FlopyException( "load_only must be iterable or None. " 'load_only value of "{}" is ' "invalid".format(load_only) ) load_only_dict = {} for item in load_only: load_only_dict[item.lower()] = True return load_only_dict @staticmethod def _in_pkg_list(pkg_list, pkg_type, pkg_name): if pkg_type is not None: pkg_type = pkg_type.lower() if pkg_name is not None: pkg_name = pkg_name.lower() if pkg_type in pkg_list or pkg_name in pkg_list: return True # split to make cases like "gwf6-gwf6" easier to process pkg_type = pkg_type.split("-") try: # if there is a number on the end of the package try # excluding it int(pkg_type[0][-1]) for key in pkg_list.keys(): key = key.split("-") if len(key) == len(pkg_type): matches = True for key_item, pkg_item in zip(key, pkg_type): if pkg_item[0:-1] != key_item and pkg_item != key_item: matches = False if matches: return True except ValueError: return False return False