Source code for flopy.mf6.mfbase

"""Base classes for Modflow 6"""

import copy
import inspect
import os
import sys
import traceback
import warnings
from collections.abc import Iterable
from enum import Enum
from pathlib import Path
from shutil import copyfile
from typing import Union
from warnings import warn


# internal handled exceptions
[docs]class MFInvalidTransientBlockHeaderException(Exception): """ Exception occurs when parsing a transient block header """
[docs]class ReadAsArraysException(Exception): """ Exception occurs when loading ReadAsArrays package as non-ReadAsArrays package. """
# external exceptions for users
[docs]class FlopyException(Exception): """ General FloPy exception """ def __init__(self, error, location=""): self.message = error super().__init__(f"{error} ({location})")
[docs]class StructException(Exception): """ Exception with the package file structure """ def __init__(self, error, location): self.message = error super().__init__(f"{error} ({location})")
[docs]class MFDataException(Exception): """ Exception with MODFLOW data. Exception includes detailed error information. """ def __init__( self, model=None, package=None, path=None, current_process=None, data_element=None, method_caught_in=None, org_type=None, org_value=None, org_traceback=None, message=None, debug=None, mfdata_except=None, ): if mfdata_except is not None and isinstance( mfdata_except, MFDataException ): # copy constructor - copying values from original exception self.model = mfdata_except.model self.package = mfdata_except.package self.current_process = mfdata_except.current_process self.data_element = mfdata_except.data_element self.path = mfdata_except.path self.messages = mfdata_except.messages self.debug = mfdata_except.debug self.method_caught_in = mfdata_except.method_caught_in self.org_type = mfdata_except.org_type self.org_value = mfdata_except.org_value self.org_traceback = mfdata_except.org_traceback self.org_tb_string = mfdata_except.org_tb_string else: self.messages = [] if mfdata_except is not None and ( isinstance(mfdata_except, StructException) or isinstance(mfdata_except, FlopyException) ): self.messages.append(mfdata_except.message) self.model = None self.package = None self.current_process = None self.data_element = None self.path = None self.debug = False self.method_caught_in = None self.org_type = None self.org_value = None self.org_traceback = None self.org_tb_string = None # override/assign any values that are not none if model is not None: self.model = model if package is not None: self.package = package if current_process is not None: self.current_process = current_process if data_element is not None: self.data_element = data_element if path is not None: self.path = path if message is not None: self.messages.append(message) if debug is not None: self.debug = debug if method_caught_in is not None: self.method_caught_in = method_caught_in if org_type is not None: self.org_type = org_type if org_value is not None: self.org_value = org_value if org_traceback is not None: self.org_traceback = org_traceback self.org_tb_string = traceback.format_exception( self.org_type, self.org_value, self.org_traceback ) # build error string error_message = "An error occurred in " if self.data_element is not None and self.data_element != "": error_message += f'data element "{self.data_element}" ' if self.model is not None and self.model != "": error_message += f'model "{self.model}" ' error_message += ( f'package "{self.package}". The error occurred while ' f'{self.current_process} in the "{self.method_caught_in}" method.' ) if len(self.messages) > 0: error_message += "\nAdditional Information:\n" error_message += "\n".join( f"({idx}) {msg}" for (idx, msg) in enumerate(self.messages, 1) ) super().__init__(error_message)
[docs]class VerbosityLevel(Enum): """Determines how much information FloPy writes to the console""" quiet = 1 normal = 2 verbose = 3
[docs]class PackageContainerType(Enum): """Determines whether a package container is a simulation, model, or package.""" simulation = 1 model = 2 package = 3
[docs]class ExtFileAction(Enum): """Defines what to do with external files when the simulation or model's path change.""" copy_all = 1 copy_none = 2 copy_relative_paths = 3
[docs]class MFFilePath: """Class that stores a single file path along with the associated model name.""" def __init__(self, file_path, model_name): self.file_path = file_path self.model_name = {model_name: 0}
[docs] def isabs(self): return os.path.isabs(self.file_path)
[docs]class MFFileMgmt: """ Class containing MODFLOW path data Parameters ---------- path : str or PathLike Path on disk to the simulation Attributes ---------- model_relative_path : dict Dictionary of relative paths to each model folder """ def __init__(self, path: Union[str, os.PathLike], mfsim=None): self.simulation = mfsim self._sim_path = "" self.set_sim_path(path, True) # keys:fully pathed filenames, vals:FilePath instances self.existing_file_dict = {} # keys:filenames,vals:instance name self.model_relative_path = {} self._last_loaded_sim_path = None self._last_loaded_model_relative_path = {}
[docs] def copy_files(self, copy_relative_only=True): """Copy files external to updated path. Parameters ---------- copy_relative_only : bool Only copy files with relative paths. """ num_files_copied = 0 if self._last_loaded_sim_path is not None: for mffile_path in self.existing_file_dict.values(): # resolve previous simulation path. if mf6 changes # so that paths are relative to the model folder, then # this call should have "model_name" instead of "None" path_old = self.resolve_path(mffile_path, None, True) if os.path.isfile(path_old) and ( not mffile_path.isabs() or not copy_relative_only ): # change "None" to "model_name" as above if mf6 # supports model relative paths path_new = self.resolve_path(mffile_path, None) if path_old != path_new: new_folders = os.path.split(path_new)[0] if not os.path.exists(new_folders): os.makedirs(new_folders) try: copyfile(path_old, path_new) except: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "appending data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ) num_files_copied += 1 return num_files_copied
[docs] def get_updated_path( self, external_file_path, model_name, ext_file_action ): """For internal FloPy use, not intended for end user.""" return external_file_path
def _build_relative_path(self, model_name): old_abs_path = self.resolve_path("", model_name, True) current_abs_path = self.resolve_path("", model_name, False) return os.path.relpath(old_abs_path, current_abs_path)
[docs] def strip_model_relative_path(self, model_name, path) -> str: """Strip out the model relative path part of `path`. For internal FloPy use, not intended for end user.""" if model_name not in self.model_relative_path: return path model_rel_path = Path(self.model_relative_path[model_name]) if ( model_rel_path is None or model_rel_path.is_absolute() or not any(str(model_rel_path)) or str(model_rel_path) == os.curdir ): return path try: ret_path = Path(path).relative_to(model_rel_path) except ValueError: warnings.warn( f"Could not strip model relative path from {path}: {traceback.format_exc()}" ) ret_path = Path(path) return str(ret_path.as_posix())
[docs] @staticmethod def unique_file_name(file_name, lookup): """Generate a unique file name. For internal FloPy use, not intended for end user.""" num = 0 while MFFileMgmt._build_file(file_name, num) in lookup: num += 1 return MFFileMgmt._build_file(file_name, num)
@staticmethod def _build_file(file_name, num): file, ext = os.path.splitext(file_name) if ext: return f"{file}_{num}{ext}" else: return f"{file}_{num}"
[docs] def set_last_accessed_path(self): """Set the last accessed simulation path to the current simulation path. For internal FloPy use, not intended for end user.""" self._last_loaded_sim_path = self._sim_path self.set_last_accessed_model_path()
[docs] def set_last_accessed_model_path(self): """Set the last accessed model path to the current model path. For internal FloPy use, not intended for end user.""" for key, item in self.model_relative_path.items(): self._last_loaded_model_relative_path[key] = copy.deepcopy(item)
[docs] def get_model_path(self, key, last_loaded_path=False): """Returns the model working path for the model `key`. Parameters ---------- key : str Model name whose path flopy will retrieve last_loaded_path : bool Get the last path loaded by FloPy which may not be the most recent path. Returns ------- model path : str """ if last_loaded_path: return os.path.join( self._last_loaded_sim_path, self._last_loaded_model_relative_path[key], ) else: if key in self.model_relative_path: return os.path.join( self._sim_path, self.model_relative_path[key] ) else: return self._sim_path
[docs] def get_sim_path(self, last_loaded_path=False): """Get the simulation path.""" if last_loaded_path: return self._last_loaded_sim_path else: return self._sim_path
[docs] def add_ext_file(self, file_path, model_name): """Add an external file to the path list. For internal FloPy use, not intended for end user.""" if file_path in self.existing_file_dict: if model_name not in self.existing_file_dict[file_path].model_name: self.existing_file_dict[file_path].model_name[model_name] = 0 else: new_file_path = MFFilePath(file_path, model_name) self.existing_file_dict[file_path] = new_file_path
[docs] def set_sim_path(self, path: Union[str, os.PathLike], internal_use=False): """ Set the file path to the simulation files. Internal use only, call MFSimulation's set_sim_path method instead. Parameters ---------- path : str or PathLike Path to simulation folder Returns ------- None Examples -------- self.simulation_data.mfdata.set_sim_path('path/to/workspace') """ if not internal_use: print( "WARNING: MFFileMgt's set_sim_path has been deprecated. " "Please use MFSimulation's set_sim_path in the future." ) if self.simulation is not None: self.simulation.set_sim_path(path) return # expand tildes and ensure _sim_path is absolute self._sim_path = Path(path).expanduser().absolute()
[docs] def resolve_path( self, path, model_name, last_loaded_path=False, move_abs_paths=False ): """Resolve a simulation or model path. For internal FloPy use, not intended for end user.""" if isinstance(path, MFFilePath): file_path = str(path.file_path) else: file_path = str(path) # remove quote characters from file path file_path = file_path.replace("'", "") file_path = file_path.replace('"', "") if os.path.isabs(file_path): # path is an absolute path if move_abs_paths: return self.get_sim_path(last_loaded_path) else: return file_path else: # path is a relative path return os.path.join(self.get_sim_path(last_loaded_path), file_path)
[docs]class PackageContainer: """ Base class for any class containing packages. Parameters ---------- simulation_data : SimulationData The simulation's SimulationData object name : str Name of the package container object Attributes ---------- package_type_dict : dictionary Dictionary of packages by package type package_name_dict : dictionary Dictionary of packages by package name """ modflow_packages = [] packages_by_abbr = {} modflow_models = [] models_by_type = {} def __init__(self, simulation_data, name): self.type = "PackageContainer" self.simulation_data = simulation_data self.name = name self._packagelist = [] self.package_type_dict = {} self.package_name_dict = {} self.package_filename_dict = {} @property def package_key_dict(self): warnings.warn( "package_key_dict has been deprecated, use " "package_type_dict instead", category=DeprecationWarning, ) return self.package_type_dict
[docs] @staticmethod def package_list(): """Static method that returns the list of available packages. For internal FloPy use only, not intended for end users. Returns a list of MFPackage subclasses """ # all packages except "group" classes package_list = [] for abbr, package in sorted(PackageContainer.packages_by_abbr.items()): # don't store packages "group" classes if not abbr.endswith("packages"): package_list.append(package) return package_list
[docs] @staticmethod def package_factory(package_type: str, model_type: str): """Static method that returns the appropriate package type object based on the package_type and model_type strings. For internal FloPy use only, not intended for end users. Parameters ---------- package_type : str Type of package to create model_type : str Type of model that package is a part of Returns ------- package : MFPackage subclass """ package_abbr = f"{model_type}{package_type}" factory = PackageContainer.packages_by_abbr.get(package_abbr) if factory is None: package_utl_abbr = f"utl{package_type}" factory = PackageContainer.packages_by_abbr.get(package_utl_abbr) return factory
[docs] @staticmethod def model_factory(model_type): """Static method that returns the appropriate model type object based on the model_type string. For internal FloPy use only, not intended for end users. Parameters ---------- model_type : str Type of model that package is a part of Returns ------- model : MFModel subclass """ return PackageContainer.models_by_type.get(model_type)
[docs] @staticmethod def get_module_val(module, item, attrb): """Static method that returns a python class module value. For internal FloPy use only, not intended for end users.""" value = getattr(module, item) # verify this is a class if ( not value or not inspect.isclass(value) or not hasattr(value, attrb) ): return None return value
@property def package_dict(self): """Returns a copy of the package name dictionary.""" return self.package_name_dict.copy() @property def package_names(self): """Returns a list of package names.""" return list(self.package_name_dict.keys()) def _add_package(self, package, path): # put in packages list and update lookup dictionaries self._packagelist.append(package) if package.package_name is not None: self.package_name_dict[package.package_name.lower()] = package if package.filename is not None: self.package_filename_dict[package.filename.lower()] = package if package.package_type not in self.package_type_dict: self.package_type_dict[package.package_type.lower()] = [] self.package_type_dict[package.package_type.lower()].append(package) def _remove_package(self, package): if package in self._packagelist: self._packagelist.remove(package) if ( package.package_name is not None and package.package_name.lower() in self.package_name_dict ): del self.package_name_dict[package.package_name.lower()] if ( package.filename is not None and package.filename.lower() in self.package_filename_dict ): del self.package_filename_dict[package.filename.lower()] if package.package_type.lower() in self.package_type_dict: package_list = self.package_type_dict[package.package_type.lower()] if package in package_list: package_list.remove(package) if len(package_list) == 0: del self.package_type_dict[package.package_type.lower()] # collect keys of items to be removed from main dictionary items_to_remove = [] for key in self.simulation_data.mfdata: is_subkey = True for pitem, ditem in zip(package.path, key): if pitem != ditem: is_subkey = False break if is_subkey: items_to_remove.append(key) # remove items from main dictionary for key in items_to_remove: del self.simulation_data.mfdata[key] def _rename_package(self, package, new_name): # fix package_name_dict key if ( package.package_name is not None and package.package_name.lower() in self.package_name_dict ): del self.package_name_dict[package.package_name.lower()] self.package_name_dict[new_name.lower()] = package # get keys to fix in main dictionary main_dict = self.simulation_data.mfdata items_to_fix = [] for key in main_dict: is_subkey = True for pitem, ditem in zip(package.path, key): if pitem != ditem: is_subkey = False break if is_subkey: items_to_fix.append(key) # fix keys in main dictionary for key in items_to_fix: new_key = ( package.path[:-1] + (new_name,) + key[len(package.path) - 1 :] ) main_dict[new_key] = main_dict.pop(key)
[docs] def get_package(self, name=None, type_only=False, name_only=False): """ Finds a package by package name, package key, package type, or partial package name. returns either a single package, a list of packages, or None. Parameters ---------- name : str Name or type of the package, 'my-riv-1, 'RIV', 'LPF', etc. type_only : bool Search for package by type only name_only : bool Search for package by name only Returns ------- pp : Package object """ if name is None: return self._packagelist[:] # search for full package name if name.lower() in self.package_name_dict and not type_only: return self.package_name_dict[name.lower()] # search for package type if name.lower() in self.package_type_dict and not name_only: if len(self.package_type_dict[name.lower()]) == 0: return None elif len(self.package_type_dict[name.lower()]) == 1: return self.package_type_dict[name.lower()][0] else: return self.package_type_dict[name.lower()] # search for file name if name.lower() in self.package_filename_dict and not type_only: return self.package_filename_dict[name.lower()] # search for partial and case-insensitive package name if not type_only: for pp in self._packagelist: if pp.package_name is not None: # get first package of the type requested package_name = pp.package_name.lower() if len(package_name) > len(name): package_name = package_name[0 : len(name)] if package_name.lower() == name.lower(): return pp return None
[docs] def register_package(self, package): """Base method for registering a package. Should be overridden.""" path = (package.package_name,) return (path, None)
@staticmethod def _load_only_dict(load_only): if load_only is None: return None if isinstance(load_only, dict): return load_only if not isinstance(load_only, Iterable): raise FlopyException( "load_only must be iterable or None. " 'load_only value of "{}" is ' "invalid".format(load_only) ) load_only_dict = {} for item in load_only: load_only_dict[item.lower()] = True return load_only_dict @staticmethod def _in_pkg_list(pkg_list, pkg_type, pkg_name): if pkg_type is not None: pkg_type = pkg_type.lower() if pkg_name is not None: pkg_name = pkg_name.lower() if pkg_type in pkg_list or pkg_name in pkg_list: return True # split to make cases like "gwf6-gwf6" easier to process pkg_type = pkg_type.split("-") try: # if there is a number on the end of the package try # excluding it int(pkg_type[0][-1]) for key in pkg_list.keys(): key = key.split("-") if len(key) == len(pkg_type): matches = True for key_item, pkg_item in zip(key, pkg_type): if pkg_item[0:-1] != key_item and pkg_item != key_item: matches = False if matches: return True except ValueError: return False return False