""" Base classes for Modflow 6 """
import copy
import inspect
import os
import sys
import traceback
from collections.abc import Iterable
from enum import Enum
from shutil import copyfile
# internal handled exceptions
[docs]class ReadAsArraysException(Exception):
"""
Exception occurs when loading ReadAsArrays package as non-ReadAsArrays
package.
"""
# external exceptions for users
[docs]class FlopyException(Exception):
"""
General FloPy exception
"""
def __init__(self, error, location=""):
self.message = error
super().__init__(f"{error} ({location})")
[docs]class StructException(Exception):
"""
Exception with the package file structure
"""
def __init__(self, error, location):
self.message = error
super().__init__(f"{error} ({location})")
[docs]class MFDataException(Exception):
"""
Exception with MODFLOW data. Exception includes detailed error
information.
"""
def __init__(
self,
model=None,
package=None,
path=None,
current_process=None,
data_element=None,
method_caught_in=None,
org_type=None,
org_value=None,
org_traceback=None,
message=None,
debug=None,
mfdata_except=None,
):
if mfdata_except is not None and isinstance(
mfdata_except, MFDataException
):
# copy constructor - copying values from original exception
self.model = mfdata_except.model
self.package = mfdata_except.package
self.current_process = mfdata_except.current_process
self.data_element = mfdata_except.data_element
self.path = mfdata_except.path
self.messages = mfdata_except.messages
self.debug = mfdata_except.debug
self.method_caught_in = mfdata_except.method_caught_in
self.org_type = mfdata_except.org_type
self.org_value = mfdata_except.org_value
self.org_traceback = mfdata_except.org_traceback
self.org_tb_string = mfdata_except.org_tb_string
else:
self.messages = []
if mfdata_except is not None and (
isinstance(mfdata_except, StructException)
or isinstance(mfdata_except, FlopyException)
):
self.messages.append(mfdata_except.message)
self.model = None
self.package = None
self.current_process = None
self.data_element = None
self.path = None
self.debug = False
self.method_caught_in = None
self.org_type = None
self.org_value = None
self.org_traceback = None
self.org_tb_string = None
# override/assign any values that are not none
if model is not None:
self.model = model
if package is not None:
self.package = package
if current_process is not None:
self.current_process = current_process
if data_element is not None:
self.data_element = data_element
if path is not None:
self.path = path
if message is not None:
self.messages.append(message)
if debug is not None:
self.debug = debug
if method_caught_in is not None:
self.method_caught_in = method_caught_in
if org_type is not None:
self.org_type = org_type
if org_value is not None:
self.org_value = org_value
if org_traceback is not None:
self.org_traceback = org_traceback
self.org_tb_string = traceback.format_exception(
self.org_type, self.org_value, self.org_traceback
)
# build error string
error_message = "An error occurred in "
if self.data_element is not None and self.data_element != "":
error_message += f'data element "{self.data_element}" '
if self.model is not None and self.model != "":
error_message += f'model "{self.model}" '
error_message += (
f'package "{self.package}". The error occurred while '
f'{self.current_process} in the "{self.method_caught_in}" method.'
)
if len(self.messages) > 0:
error_message += "\nAdditional Information:\n"
error_message += "\n".join(
f"({idx}) {msg}" for (idx, msg) in enumerate(self.messages, 1)
)
super().__init__(error_message)
[docs]class VerbosityLevel(Enum):
"""Determines how much information FloPy writes to the console"""
quiet = 1
normal = 2
verbose = 3
[docs]class PackageContainerType(Enum):
"""Determines whether a package container is a simulation, model, or
package."""
simulation = 1
model = 2
package = 3
[docs]class ExtFileAction(Enum):
"""Defines what to do with external files when the simulation or model's
path change."""
copy_all = 1
copy_none = 2
copy_relative_paths = 3
[docs]class MFFilePath:
"""Class that stores a single file path along with the associated model
name."""
def __init__(self, file_path, model_name):
self.file_path = file_path
self.model_name = {model_name: 0}
[docs] def isabs(self):
return os.path.isabs(self.file_path)
[docs]class MFFileMgmt:
"""
Class containing MODFLOW path data
Parameters
----------
path : str
Path on disk to the simulation
Attributes
----------
model_relative_path : dict
Dictionary of relative paths to each model folder
"""
def __init__(self, path, mfsim=None):
self.simulation = mfsim
self._sim_path = ""
self.set_sim_path(path, True)
# keys:fully pathed filenames, vals:FilePath instances
self.existing_file_dict = {}
# keys:filenames,vals:instance name
self.model_relative_path = {}
self._last_loaded_sim_path = None
self._last_loaded_model_relative_path = {}
[docs] def copy_files(self, copy_relative_only=True):
"""Copy files external to updated path.
Parameters
----------
copy_relative_only : bool
Only copy files with relative paths.
"""
num_files_copied = 0
if self._last_loaded_sim_path is not None:
for mffile_path in self.existing_file_dict.values():
# resolve previous simulation path. if mf6 changes
# so that paths are relative to the model folder, then
# this call should have "model_name" instead of "None"
path_old = self.resolve_path(mffile_path, None, True)
if os.path.isfile(path_old) and (
not mffile_path.isabs() or not copy_relative_only
):
# change "None" to "model_name" as above if mf6
# supports model relative paths
path_new = self.resolve_path(mffile_path, None)
if path_old != path_new:
new_folders = os.path.split(path_new)[0]
if not os.path.exists(new_folders):
os.makedirs(new_folders)
try:
copyfile(path_old, path_new)
except:
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"appending data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
None,
self._simulation_data.debug,
)
num_files_copied += 1
return num_files_copied
[docs] def get_updated_path(
self, external_file_path, model_name, ext_file_action
):
"""For internal FloPy use, not intended for end user."""
return external_file_path
def _build_relative_path(self, model_name):
old_abs_path = self.resolve_path("", model_name, True)
current_abs_path = self.resolve_path("", model_name, False)
return os.path.relpath(old_abs_path, current_abs_path)
[docs] def strip_model_relative_path(self, model_name, path):
"""Strip out the model relative path part of `path`. For internal
FloPy use, not intended for end user."""
new_path = path
if model_name in self.model_relative_path:
model_rel_path = self.model_relative_path[model_name]
if (
model_rel_path is not None
and len(model_rel_path) > 0
and model_rel_path != "."
):
model_rel_path_lst = model_rel_path.split(os.path.sep)
path_lst = path.split(os.path.sep)
new_path = ""
for i, mrp in enumerate(model_rel_path_lst):
if mrp != path_lst[i]:
new_path = os.path.join(new_path, path_lst[i])
for rp in path_lst[len(model_rel_path_lst) :]:
new_path = os.path.join(new_path, rp)
return new_path
[docs] @staticmethod
def unique_file_name(file_name, lookup):
"""Generate a unique file name. For internal FloPy use, not intended
for end user."""
num = 0
while MFFileMgmt._build_file(file_name, num) in lookup:
num += 1
return MFFileMgmt._build_file(file_name, num)
@staticmethod
def _build_file(file_name, num):
file, ext = os.path.splitext(file_name)
if ext:
return f"{file}_{num}{ext}"
else:
return f"{file}_{num}"
[docs] @staticmethod
def string_to_file_path(fp_string):
"""Interpret string as a file path. For internal FloPy use, not
intended for end user."""
file_delimiters = ["/", "\\"]
new_string = fp_string
for delimiter in file_delimiters:
arr_string = new_string.split(delimiter)
if len(arr_string) > 1:
if os.path.isabs(fp_string):
if not arr_string[0] and not arr_string[1]:
new_string = f"{delimiter}{delimiter}"
else:
new_string = (
f"{arr_string[0]}{delimiter}{arr_string[1]}"
)
else:
new_string = os.path.join(arr_string[0], arr_string[1])
if len(arr_string) > 2:
for path_piece in arr_string[2:]:
new_string = os.path.join(new_string, path_piece)
return new_string
[docs] def set_last_accessed_path(self):
"""Set the last accessed simulation path to the current simulation
path. For internal FloPy use, not intended for end user."""
self._last_loaded_sim_path = self._sim_path
self.set_last_accessed_model_path()
[docs] def set_last_accessed_model_path(self):
"""Set the last accessed model path to the current model path.
For internal FloPy use, not intended for end user."""
for key, item in self.model_relative_path.items():
self._last_loaded_model_relative_path[key] = copy.deepcopy(item)
[docs] def get_model_path(self, key, last_loaded_path=False):
"""Returns the model working path for the model `key`.
Parameters
----------
key : str
Model name whose path flopy will retrieve
last_loaded_path : bool
Get the last path loaded by FloPy which may not be the most
recent path.
Returns
-------
model path : str
"""
if last_loaded_path:
return os.path.join(
self._last_loaded_sim_path,
self._last_loaded_model_relative_path[key],
)
else:
if key in self.model_relative_path:
return os.path.join(
self._sim_path, self.model_relative_path[key]
)
else:
return self._sim_path
[docs] def get_sim_path(self, last_loaded_path=False):
"""Get the simulation path."""
if last_loaded_path:
return self._last_loaded_sim_path
else:
return self._sim_path
[docs] def add_ext_file(self, file_path, model_name):
"""Add an external file to the path list. For internal FloPy use, not
intended for end user."""
if file_path in self.existing_file_dict:
if model_name not in self.existing_file_dict[file_path].model_name:
self.existing_file_dict[file_path].model_name[model_name] = 0
else:
new_file_path = MFFilePath(file_path, model_name)
self.existing_file_dict[file_path] = new_file_path
[docs] def set_sim_path(self, path, internal_use=False):
"""
Set the file path to the simulation files. Internal use only,
call MFSimulation's set_sim_path method instead.
Parameters
----------
path : str
Full path or relative path from working directory to
simulation folder
Returns
-------
Examples
--------
self.simulation_data.mfdata.set_sim_path('sim_folder')
"""
if not internal_use:
print(
"WARNING: MFFileMgt's set_sim_path has been deprecated. "
"Please use MFSimulation's set_sim_path in the future."
)
if self.simulation is not None:
self.simulation.set_sim_path(path)
return
# recalculate paths for everything
# resolve path type
path = self.string_to_file_path(path)
if os.path.isabs(path):
self._sim_path = path
else:
# assume path is relative to working directory
self._sim_path = os.path.join(os.getcwd(), path)
[docs] def resolve_path(
self, path, model_name, last_loaded_path=False, move_abs_paths=False
):
"""Resolve a simulation or model path. For internal FloPy use, not
intended for end user."""
if isinstance(path, MFFilePath):
file_path = path.file_path
else:
file_path = path
# remove quote characters from file path
file_path = file_path.replace("'", "")
file_path = file_path.replace('"', "")
if os.path.isabs(file_path):
# path is an absolute path
if move_abs_paths:
return self.get_sim_path(last_loaded_path)
else:
return file_path
else:
# path is a relative path
return os.path.join(self.get_sim_path(last_loaded_path), file_path)
[docs]class PackageContainer:
"""
Base class for any class containing packages.
Parameters
----------
simulation_data : SimulationData
The simulation's SimulationData object
name : str
Name of the package container object
Attributes
----------
package_type_dict : dictionary
Dictionary of packages by package type
package_name_dict : dictionary
Dictionary of packages by package name
package_key_dict : dictionary
Dictionary of packages by package key
"""
modflow_packages = []
packages_by_abbr = {}
modflow_models = []
models_by_type = {}
def __init__(self, simulation_data, name):
self.type = "PackageContainer"
self.simulation_data = simulation_data
self.name = name
self._packagelist = []
self.package_type_dict = {}
self.package_name_dict = {}
self.package_key_dict = {}
[docs] @staticmethod
def package_list():
"""Static method that returns the list of available packages.
For internal FloPy use only, not intended for end users.
Returns a list of MFPackage subclasses
"""
# all packages except "group" classes
package_list = []
for abbr, package in sorted(PackageContainer.packages_by_abbr.items()):
# don't store packages "group" classes
if not abbr.endswith("packages"):
package_list.append(package)
return package_list
[docs] @staticmethod
def package_factory(package_type: str, model_type: str):
"""Static method that returns the appropriate package type object based
on the package_type and model_type strings. For internal FloPy use
only, not intended for end users.
Parameters
----------
package_type : str
Type of package to create
model_type : str
Type of model that package is a part of
Returns
-------
package : MFPackage subclass
"""
package_abbr = f"{model_type}{package_type}"
factory = PackageContainer.packages_by_abbr.get(package_abbr)
if factory is None:
package_utl_abbr = "utl{}".format(package_type)
factory = PackageContainer.packages_by_abbr.get(package_utl_abbr)
return factory
[docs] @staticmethod
def model_factory(model_type):
"""Static method that returns the appropriate model type object based
on the model_type string. For internal FloPy use only, not intended
for end users.
Parameters
----------
model_type : str
Type of model that package is a part of
Returns
-------
model : MFModel subclass
"""
return PackageContainer.models_by_type.get(model_type)
[docs] @staticmethod
def get_module_val(module, item, attrb):
"""Static method that returns a python class module value. For
internal FloPy use only, not intended for end users."""
value = getattr(module, item)
# verify this is a class
if (
not value
or not inspect.isclass(value)
or not hasattr(value, attrb)
):
return None
return value
@property
def package_dict(self):
"""Returns a copy of the package name dictionary."""
return self.package_name_dict.copy()
@property
def package_names(self):
"""Returns a list of package names."""
return list(self.package_name_dict.keys())
def _add_package(self, package, path):
# put in packages list and update lookup dictionaries
self._packagelist.append(package)
if package.package_name is not None:
self.package_name_dict[package.package_name.lower()] = package
self.package_key_dict[path[-1].lower()] = package
if package.package_type not in self.package_type_dict:
self.package_type_dict[package.package_type.lower()] = []
self.package_type_dict[package.package_type.lower()].append(package)
def _remove_package(self, package):
self._packagelist.remove(package)
if (
package.package_name is not None
and package.package_name.lower() in self.package_name_dict
):
del self.package_name_dict[package.package_name.lower()]
del self.package_key_dict[package.path[-1].lower()]
package_list = self.package_type_dict[package.package_type.lower()]
package_list.remove(package)
if len(package_list) == 0:
del self.package_type_dict[package.package_type.lower()]
# collect keys of items to be removed from main dictionary
items_to_remove = []
for key in self.simulation_data.mfdata:
is_subkey = True
for pitem, ditem in zip(package.path, key):
if pitem != ditem:
is_subkey = False
break
if is_subkey:
items_to_remove.append(key)
# remove items from main dictionary
for key in items_to_remove:
del self.simulation_data.mfdata[key]
def _rename_package(self, package, new_name):
# fix package_name_dict key
if (
package.package_name is not None
and package.package_name.lower() in self.package_name_dict
):
del self.package_name_dict[package.package_name.lower()]
self.package_name_dict[new_name.lower()] = package
# fix package_key_dict key
new_package_path = package.path[:-1] + (new_name,)
del self.package_key_dict[package.path[-1].lower()]
self.package_key_dict[new_package_path.lower()] = package
# get keys to fix in main dictionary
main_dict = self.simulation_data.mfdata
items_to_fix = []
for key in main_dict:
is_subkey = True
for pitem, ditem in zip(package.path, key):
if pitem != ditem:
is_subkey = False
break
if is_subkey:
items_to_fix.append(key)
# fix keys in main dictionary
for key in items_to_fix:
new_key = (
package.path[:-1] + (new_name,) + key[len(package.path) - 1 :]
)
main_dict[new_key] = main_dict.pop(key)
[docs] def get_package(self, name=None):
"""
Finds a package by package name, package key, package type, or partial
package name. returns either a single package, a list of packages,
or None.
Parameters
----------
name : str
Name of the package, 'RIV', 'LPF', etc.
Returns
-------
pp : Package object
"""
if name is None:
return self._packagelist[:]
# search for full package name
if name.lower() in self.package_name_dict:
return self.package_name_dict[name.lower()]
# search for package type
if name.lower() in self.package_type_dict:
if len(self.package_type_dict[name.lower()]) == 0:
return None
elif len(self.package_type_dict[name.lower()]) == 1:
return self.package_type_dict[name.lower()][0]
else:
return self.package_type_dict[name.lower()]
# search for package key
if name.lower() in self.package_key_dict:
return self.package_key_dict[name.lower()]
# search for partial and case-insensitive package name
for pp in self._packagelist:
if pp.package_name is not None:
# get first package of the type requested
package_name = pp.package_name.lower()
if len(package_name) > len(name):
package_name = package_name[0 : len(name)]
if package_name.lower() == name.lower():
return pp
return None
[docs] def register_package(self, package):
"""Base method for registering a package. Should be overridden."""
path = (package.package_name,)
return (path, None)
@staticmethod
def _load_only_dict(load_only):
if load_only is None:
return None
if isinstance(load_only, dict):
return load_only
if not isinstance(load_only, Iterable):
raise FlopyException(
"load_only must be iterable or None. "
'load_only value of "{}" is '
"invalid".format(load_only)
)
load_only_dict = {}
for item in load_only:
load_only_dict[item.lower()] = True
return load_only_dict
@staticmethod
def _in_pkg_list(pkg_list, pkg_type, pkg_name):
if pkg_type is not None:
pkg_type = pkg_type.lower()
if pkg_name is not None:
pkg_name = pkg_name.lower()
if pkg_type in pkg_list or pkg_name in pkg_list:
return True
# split to make cases like "gwf6-gwf6" easier to process
pkg_type = pkg_type.split("-")
try:
# if there is a number on the end of the package try
# excluding it
int(pkg_type[0][-1])
for key in pkg_list.keys():
key = key.split("-")
if len(key) == len(pkg_type):
matches = True
for key_item, pkg_item in zip(key, pkg_type):
if pkg_item[0:-1] != key_item and pkg_item != key_item:
matches = False
if matches:
return True
except ValueError:
return False
return False