import copy
import datetime
import errno
import inspect
import os
import sys
from re import S
import numpy as np
from ..mbase import ModelInterface
from ..pakbase import PackageInterface
from ..utils import datautil
from ..utils.check import mf6check
from ..version import __version__
from .coordinates import modeldimensions
from .data import mfdata, mfdataarray, mfdatalist, mfdatascalar, mfstructure
from .data.mfdatautil import DataSearchOutput, MFComment, cellids_equal
from .data.mfstructure import DatumType, MFDataItemStructure, MFStructure
from .mfbase import (
ExtFileAction,
FlopyException,
MFDataException,
MFFileMgmt,
MFInvalidTransientBlockHeaderException,
PackageContainer,
PackageContainerType,
ReadAsArraysException,
VerbosityLevel,
)
from .utils.output_util import MF6Output
[docs]
class MFBlock:
"""
Represents a block in a MF6 input file. This class is used internally
by FloPy and use by users of the FloPy library is not recommended.
Parameters
----------
simulation_data : MFSimulationData
Data specific to this simulation
dimensions : MFDimensions
Describes model dimensions including model grid and simulation time
structure : MFVariableStructure
Structure describing block
path : tuple
Unique path to block
Attributes
----------
block_headers : MFBlockHeader
Block header text (BEGIN/END), header variables, comments in the
header
structure : MFBlockStructure
Structure describing block
path : tuple
Unique path to block
datasets : OrderDict
Dictionary of dataset objects with keys that are the name of the
dataset
datasets_keyword : dict
Dictionary of dataset objects with keys that are key words to identify
start of dataset
enabled : bool
If block is being used in the simulation
"""
def __init__(
self,
simulation_data,
dimensions,
structure,
path,
model_or_sim,
container_package,
):
self._simulation_data = simulation_data
self._dimensions = dimensions
self._model_or_sim = model_or_sim
self._container_package = container_package
self.block_headers = [
MFBlockHeader(
structure.name,
[],
MFComment("", path, simulation_data, 0),
simulation_data,
path,
self,
)
]
self.structure = structure
self.path = path
self.datasets = {}
self.datasets_keyword = {}
# initially disable if optional
self.enabled = structure.number_non_optional_data() > 0
self.loaded = False
self.external_file_name = None
self._structure_init()
def __repr__(self):
return self._get_data_str(True)
def __str__(self):
return self._get_data_str(False)
def _get_data_str(self, formal):
data_str = ""
for dataset in self.datasets.values():
if formal:
ds_repr = repr(dataset)
if len(ds_repr.strip()) > 0:
data_str = (
f"{data_str}{dataset.structure.name}\n{dataset!r}\n"
)
else:
ds_str = str(dataset)
if len(ds_str.strip()) > 0:
data_str = (
f"{data_str}{dataset.structure.name}\n{dataset!s}\n"
)
return data_str
# return an MFScalar, MFList, or MFArray
[docs]
def data_factory(
self,
sim_data,
model_or_sim,
structure,
enable,
path,
dimensions,
data=None,
package=None,
):
"""Creates the appropriate data child object derived from MFData."""
data_type = structure.get_datatype()
# examine the data structure and determine the data type
if (
data_type == mfstructure.DataType.scalar_keyword
or data_type == mfstructure.DataType.scalar
):
return mfdatascalar.MFScalar(
sim_data,
model_or_sim,
structure,
data,
enable,
path,
dimensions,
)
elif (
data_type == mfstructure.DataType.scalar_keyword_transient
or data_type == mfstructure.DataType.scalar_transient
):
trans_scalar = mfdatascalar.MFScalarTransient(
sim_data, model_or_sim, structure, enable, path, dimensions
)
if data is not None:
trans_scalar.set_data(data, key=0)
return trans_scalar
elif data_type == mfstructure.DataType.array:
return mfdataarray.MFArray(
sim_data,
model_or_sim,
structure,
data,
enable,
path,
dimensions,
self,
)
elif data_type == mfstructure.DataType.array_transient:
trans_array = mfdataarray.MFTransientArray(
sim_data,
model_or_sim,
structure,
enable,
path,
dimensions,
self,
)
if data is not None:
trans_array.set_data(data, key=0)
return trans_array
elif data_type == mfstructure.DataType.list:
return mfdatalist.MFList(
sim_data,
model_or_sim,
structure,
data,
enable,
path,
dimensions,
package,
self,
)
elif data_type == mfstructure.DataType.list_transient:
trans_list = mfdatalist.MFTransientList(
sim_data,
model_or_sim,
structure,
enable,
path,
dimensions,
package,
self,
)
if data is not None:
trans_list.set_data(data, key=0, autofill=True)
return trans_list
elif data_type == mfstructure.DataType.list_multiple:
mult_list = mfdatalist.MFMultipleList(
sim_data,
model_or_sim,
structure,
enable,
path,
dimensions,
package,
self,
)
if data is not None:
mult_list.set_data(data, key=0, autofill=True)
return mult_list
def _structure_init(self):
# load datasets keywords into dictionary
for dataset_struct in self.structure.data_structures.values():
for keyword in dataset_struct.get_keywords():
self.datasets_keyword[keyword] = dataset_struct
# load block header data items into dictionary
for dataset in self.structure.block_header_structure:
self._new_dataset(dataset.name, dataset, True, None)
[docs]
def set_model_relative_path(self, model_ws):
"""Sets `model_ws` as the model path relative to the simulation's
path.
Parameters
----------
model_ws : str
Model path relative to the simulation's path.
"""
# update datasets
for key, dataset in self.datasets.items():
if dataset.structure.file_data:
try:
file_data = dataset.get_data()
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message="Error occurred while "
"getting file data from "
'"{}"'.format(dataset.structure.name),
)
if file_data:
# update file path location for all file paths
for file_line in file_data:
old_file_name = os.path.split(file_line[0])[1]
file_line[0] = os.path.join(model_ws, old_file_name)
# update block headers
for block_header in self.block_headers:
for dataset in block_header.data_items:
if dataset.structure.file_data:
try:
file_data = dataset.get_data()
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message="Error occurred while "
"getting file data from "
'"{}"'.format(dataset.structure.name),
)
if file_data:
# update file path location for all file paths
for file_line in file_data:
old_file_path, old_file_name = os.path.split(
file_line[1]
)
new_file_path = os.path.join(
model_ws, old_file_name
)
# update transient keys of datasets within the
# block
for key, idataset in self.datasets.items():
if isinstance(idataset, mfdata.MFTransient):
idataset.update_transient_key(
file_line[1], new_file_path
)
file_line[1] = os.path.join(
model_ws, old_file_name
)
[docs]
def add_dataset(self, dataset_struct, data, var_path):
"""Add data to this block."""
try:
self.datasets[var_path[-1]] = self.data_factory(
self._simulation_data,
self._model_or_sim,
dataset_struct,
True,
var_path,
self._dimensions,
data,
self._container_package,
)
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message="Error occurred while adding"
' dataset "{}" to block '
'"{}"'.format(dataset_struct.name, self.structure.name),
)
self._simulation_data.mfdata[var_path] = self.datasets[var_path[-1]]
dtype = dataset_struct.get_datatype()
if (
dtype == mfstructure.DataType.list_transient
or dtype == mfstructure.DataType.list_multiple
or dtype == mfstructure.DataType.array_transient
):
# build repeating block header(s)
if isinstance(data, dict):
# Add block headers for each dictionary key
for index in data:
if isinstance(index, tuple):
header_list = list(index)
else:
header_list = [index]
self._build_repeating_header(header_list)
elif isinstance(data, list):
# Add a single block header of value 0
self._build_repeating_header([0])
elif (
dtype != mfstructure.DataType.list_multiple
and data is not None
):
self._build_repeating_header([[0]])
return self.datasets[var_path[-1]]
def _build_repeating_header(self, header_data):
if self.header_exists(header_data[0]):
return
if (
len(self.block_headers[-1].data_items) == 1
and self.block_headers[-1].data_items[0].get_data() is not None
):
block_header_path = self.path + (len(self.block_headers) + 1,)
block_header = MFBlockHeader(
self.structure.name,
[],
MFComment("", self.path, self._simulation_data, 0),
self._simulation_data,
block_header_path,
self,
)
self.block_headers.append(block_header)
else:
block_header_path = self.path + (len(self.block_headers),)
struct = self.structure
last_header = self.block_headers[-1]
try:
last_header.build_header_variables(
self._simulation_data,
struct.block_header_structure,
block_header_path,
header_data,
self._dimensions,
)
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message="Error occurred while building"
" block header variables for block "
'"{}"'.format(last_header.name),
)
def _new_dataset(
self, key, dataset_struct, block_header=False, initial_val=None
):
dataset_path = self.path + (key,)
if block_header:
if (
dataset_struct.type == DatumType.integer
and initial_val is not None
and len(initial_val) >= 1
and dataset_struct.get_record_size()[0] == 1
):
# stress periods are stored 0 based
initial_val = int(initial_val[0]) - 1
if isinstance(initial_val, list):
initial_val_path = tuple(initial_val)
initial_val = [tuple(initial_val)]
else:
initial_val_path = initial_val
try:
new_data = self.data_factory(
self._simulation_data,
self._model_or_sim,
dataset_struct,
True,
dataset_path,
self._dimensions,
initial_val,
self._container_package,
)
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message="Error occurred while adding"
' dataset "{}" to block '
'"{}"'.format(dataset_struct.name, self.structure.name),
)
self.block_headers[-1].add_data_item(new_data, initial_val_path)
else:
try:
self.datasets[key] = self.data_factory(
self._simulation_data,
self._model_or_sim,
dataset_struct,
True,
dataset_path,
self._dimensions,
initial_val,
self._container_package,
)
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message="Error occurred while adding"
' dataset "{}" to block '
'"{}"'.format(dataset_struct.name, self.structure.name),
)
for keyword in dataset_struct.get_keywords():
self.datasets_keyword[keyword] = dataset_struct
[docs]
def is_empty(self):
"""Returns true if this block is empty."""
for key, dataset in self.datasets.items():
try:
has_data = dataset.has_data()
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message="Error occurred while verifying"
' data of dataset "{}" in block '
'"{}"'.format(dataset.structure.name, self.structure.name),
)
if has_data is not None and has_data:
return False
return True
[docs]
def load(self, block_header, fd, strict=True):
"""Loads block from file object. file object must be advanced to
beginning of block before calling.
Parameters
----------
block_header : MFBlockHeader
Block header for block block being loaded.
fd : file
File descriptor of file being loaded
strict : bool
Enforce strict MODFLOW 6 file format.
"""
# verify number of header variables
if (
len(block_header.variable_strings)
< self.structure.number_non_optional_block_header_data()
):
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
warning_str = (
'WARNING: Block header for block "{}" does not '
"contain the correct number of "
"variables {}".format(block_header.name, self.path)
)
print(warning_str)
return
if self.loaded:
# verify header has not already been loaded
for bh_current in self.block_headers:
if bh_current.is_same_header(block_header):
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
warning_str = (
'WARNING: Block header for block "{}" is '
"not a unique block header "
"{}".format(block_header.name, self.path)
)
print(warning_str)
return
# init
self.enabled = True
if not self.loaded:
self.block_headers = []
block_header.block = self
self.block_headers.append(block_header)
# process any header variable
if len(self.structure.block_header_structure) > 0:
dataset = self.structure.block_header_structure[0]
self._new_dataset(
dataset.name,
dataset,
True,
self.block_headers[-1].variable_strings,
)
# handle special readasarrays case
if self._container_package.structure.read_as_arrays:
# auxiliary variables may appear with aux variable name as keyword
aux_vars = self._container_package.auxiliary.get_data()
if aux_vars is not None:
for var_name in list(aux_vars[0])[1:]:
self.datasets_keyword[
(var_name,)
] = self._container_package.aux.structure
comments = []
# capture any initial comments
initial_comment = MFComment("", "", 0)
fd_block = fd
line = fd_block.readline()
datautil.PyListUtil.reset_delimiter_used()
arr_line = datautil.PyListUtil.split_data_line(line)
post_data_comments = MFComment("", "", self._simulation_data, 0)
while MFComment.is_comment(line, True):
initial_comment.add_text(line)
line = fd_block.readline()
arr_line = datautil.PyListUtil.split_data_line(line)
# if block not empty
external_file_info = None
if not (len(arr_line[0]) > 2 and arr_line[0][:3].upper() == "END"):
if arr_line[0].lower() == "open/close":
# open block contents from external file
fd_block.readline()
root_path = self._simulation_data.mfpath.get_sim_path()
try:
file_name = os.path.split(arr_line[1])[-1]
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
f' opening external file "{file_name}"...'
)
external_file_info = arr_line
except:
type_, value_, traceback_ = sys.exc_info()
message = f'Error reading external file specified in line "{line}"'
raise MFDataException(
self._container_package.model_name,
self._container_package._get_pname(),
self.path,
"reading external file",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if len(self.structure.data_structures) <= 1:
# load a single data set
dataset = self.datasets[next(iter(self.datasets))]
try:
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
f" loading data {dataset.structure.name}..."
)
next_line = dataset.load(
line,
fd_block,
self.block_headers[-1],
initial_comment,
external_file_info,
)
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message='Error occurred while loading data "{}" in '
'block "{}" from file "{}"'
".".format(
dataset.structure.name,
self.structure.name,
fd_block.name,
),
)
package_info_list = self._get_package_info(dataset)
if package_info_list is not None:
for package_info in package_info_list:
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
f" loading child package {package_info[0]}..."
)
fname = package_info[1]
if package_info[2] is not None:
fname = os.path.join(package_info[2], fname)
filemgr = self._simulation_data.mfpath
fname = filemgr.strip_model_relative_path(
self._model_or_sim.name, fname
)
pkg = self._model_or_sim.load_package(
package_info[0],
fname,
package_info[1],
True,
"",
package_info[3],
self._container_package,
)
if hasattr(self._container_package, package_info[0]):
package_group = getattr(
self._container_package, package_info[0]
)
package_group._append_package(
pkg, pkg.filename, False
)
if next_line[1] is not None:
arr_line = datautil.PyListUtil.split_data_line(
next_line[1]
)
else:
arr_line = ""
# capture any trailing comments
dataset.post_data_comments = post_data_comments
while arr_line and (
len(next_line[1]) <= 2 or arr_line[0][:3].upper() != "END"
):
next_line[1] = fd_block.readline().strip()
arr_line = datautil.PyListUtil.split_data_line(
next_line[1]
)
if arr_line and (
len(next_line[1]) <= 2
or arr_line[0][:3].upper() != "END"
):
post_data_comments.add_text(" ".join(arr_line))
else:
# look for keyword and store line as data or comment
try:
key, results = self._find_data_by_keyword(
line, fd_block, initial_comment
)
except MFInvalidTransientBlockHeaderException as e:
warning_str = f"WARNING: {e}"
print(warning_str)
self.block_headers.pop()
return
self._save_comments(arr_line, line, key, comments)
if results[1] is None or results[1][:3].upper() != "END":
# block consists of unordered datasets
# load the data sets out of order based on
# initial constants
line = " "
while line != "":
line = fd_block.readline()
arr_line = datautil.PyListUtil.split_data_line(line)
if arr_line:
# determine if at end of block
if (
len(arr_line[0]) > 2
and arr_line[0][:3].upper() == "END"
):
break
# look for keyword and store line as data o
# r comment
key, result = self._find_data_by_keyword(
line, fd_block, initial_comment
)
self._save_comments(arr_line, line, key, comments)
if (
result[1] is not None
and result[1][:3].upper() == "END"
):
break
self.loaded = True
self.is_valid()
def _find_data_by_keyword(self, line, fd, initial_comment):
first_key = None
nothing_found = False
next_line = [True, line]
while next_line[0] and not nothing_found:
arr_line = datautil.PyListUtil.split_data_line(next_line[1])
key = datautil.find_keyword(arr_line, self.datasets_keyword)
if key is not None:
ds_name = self.datasets_keyword[key].name
try:
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(f" loading data {ds_name}...")
next_line = self.datasets[ds_name].load(
next_line[1],
fd,
self.block_headers[-1],
initial_comment,
)
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message="Error occurred while "
'loading data "{}" in '
'block "{}" from file "{}"'
".".format(ds_name, self.structure.name, fd.name),
)
# see if first item's name indicates a reference to
# another package
package_info_list = self._get_package_info(
self.datasets[ds_name]
)
if package_info_list is not None:
for package_info in package_info_list:
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
f" loading child package {package_info[1]}..."
)
fname = package_info[1]
if package_info[2] is not None:
fname = os.path.join(package_info[2], fname)
filemgr = self._simulation_data.mfpath
fname = filemgr.strip_model_relative_path(
self._model_or_sim.name, fname
)
pkg = self._model_or_sim.load_package(
package_info[0],
fname,
package_info[1],
True,
"",
package_info[3],
self._container_package,
)
if hasattr(self._container_package, package_info[0]):
package_group = getattr(
self._container_package, package_info[0]
)
package_group._append_package(
pkg, pkg.filename, False
)
if first_key is None:
first_key = key
nothing_found = False
elif (
arr_line[0].lower() == "readasarrays"
and self.path[-1].lower() == "options"
and self._container_package.structure.read_as_arrays == False
):
error_msg = (
"ERROR: Attempting to read a ReadAsArrays "
"package as a non-ReadAsArrays "
"package {}".format(self.path)
)
raise ReadAsArraysException(error_msg)
else:
nothing_found = True
if first_key is None:
# look for recarrays. if there is a lone recarray in this block,
# use it by default
recarrays = self.structure.get_all_recarrays()
if len(recarrays) != 1:
return key, [None, None]
dataset = self.datasets[recarrays[0].name]
ds_result = dataset.load(
line, fd, self.block_headers[-1], initial_comment
)
# see if first item's name indicates a reference to another
# package
package_info_list = self._get_package_info(dataset)
if package_info_list is not None:
for package_info in package_info_list:
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
f" loading child package {package_info[0]}..."
)
fname = package_info[1]
if package_info[2] is not None:
fname = os.path.join(package_info[2], fname)
filemgr = self._simulation_data.mfpath
fname = filemgr.strip_model_relative_path(
self._model_or_sim.name, fname
)
pkg = self._model_or_sim.load_package(
package_info[0],
fname,
None,
True,
"",
package_info[3],
self._container_package,
)
if hasattr(self._container_package, package_info[0]):
package_group = getattr(
self._container_package, package_info[0]
)
package_group._append_package(pkg, pkg.filename, False)
return recarrays[0].keyword, ds_result
else:
return first_key, next_line
def _get_package_info(self, dataset):
if not dataset.structure.file_data:
return None
for index in range(0, len(dataset.structure.data_item_structures)):
data_item = dataset.structure.data_item_structures[index]
if (
data_item.type == DatumType.keyword
or data_item.type == DatumType.string
):
item_name = data_item.name
package_type = item_name[:-1]
model_type = self._model_or_sim.structure.model_type
# not all packages have the same naming convention
# try different naming conventions to find the appropriate
# package
package_types = [
package_type,
f"{self._container_package.package_type}"
f"{package_type}",
]
package_type_found = None
for ptype in package_types:
if (
PackageContainer.package_factory(ptype, model_type)
is not None
):
package_type_found = ptype
break
if package_type_found is not None:
try:
data = dataset.get_data()
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message="Error occurred while "
'getting data from "{}" '
'in block "{}".'.format(
dataset.structure.name, self.structure.name
),
)
package_info_list = []
if isinstance(data, np.recarray):
for row in data:
self._add_to_info_list(
package_info_list,
row[index],
package_type_found,
)
else:
self._add_to_info_list(
package_info_list, data, package_type_found
)
return package_info_list
return None
return None
def _add_to_info_list(
self, package_info_list, file_location, package_type_found
):
file_path, file_name = os.path.split(file_location)
dict_package_name = f"{package_type_found}_{self.path[-2]}"
package_info_list.append(
(
package_type_found,
file_name,
file_path,
dict_package_name,
)
)
def _save_comments(self, arr_line, line, key, comments):
# FIX: Save these comments somewhere in the data set
if not key in self.datasets_keyword:
if MFComment.is_comment(key, True):
if comments:
comments.append("\n")
comments.append(arr_line)
[docs]
def write(self, fd, ext_file_action=ExtFileAction.copy_relative_paths):
"""Writes block to a file object.
Parameters
----------
fd : file object
File object to write to.
"""
# never write an empty block
is_empty = self.is_empty()
if (
is_empty
and self.structure.name.lower() != "exchanges"
and self.structure.name.lower() != "options"
and self.structure.name.lower() != "sources"
and self.structure.name.lower() != "stressperioddata"
):
return
if self.structure.repeating():
repeating_datasets = self._find_repeating_datasets()
for repeating_dataset in repeating_datasets:
# resolve any missing block headers
self._add_missing_block_headers(repeating_dataset)
for block_header in sorted(self.block_headers):
# write block
self._write_block(fd, block_header, ext_file_action)
else:
self._write_block(fd, self.block_headers[0], ext_file_action)
def _add_missing_block_headers(self, repeating_dataset):
key_data_list = repeating_dataset.get_active_key_list()
# assemble a dictionary of data keys and empty keys
key_dict = {}
for key in key_data_list:
key_dict[key[0]] = True
for key, value in repeating_dataset.empty_keys.items():
if value:
key_dict[key] = True
for key in key_dict.keys():
has_data = repeating_dataset.has_data(key)
empty_key = (
key in repeating_dataset.empty_keys
and repeating_dataset.empty_keys[key]
)
if not self.header_exists(key) and (has_data or empty_key):
self._build_repeating_header([key])
[docs]
def set_all_data_external(
self, base_name, check_data=True, external_data_folder=None
):
"""Sets the block's list and array data to be stored externally,
base_name is external file name's prefix, check_data determines
if data error checking is enabled during this process.
Parameters
----------
base_name : str
Base file name of external files where data will be written to.
check_data : bool
Whether to do data error checking.
external_data_folder
Folder where external data will be stored
"""
for key, dataset in self.datasets.items():
if (
isinstance(dataset, mfdataarray.MFArray)
or (
isinstance(dataset, mfdatalist.MFList)
and dataset.structure.type == DatumType.recarray
)
and dataset.enabled
):
file_path = f"{base_name}_{dataset.structure.name}.txt"
replace_existing_external = False
if external_data_folder is not None:
# get simulation root path
root_path = self._simulation_data.mfpath.get_sim_path()
# get model relative path, if it exists
if isinstance(self._model_or_sim, ModelInterface):
name = self._model_or_sim.name
rel_path = (
self._simulation_data.mfpath.model_relative_path[
name
]
)
if rel_path is not None:
root_path = os.path.join(root_path, rel_path)
full_path = os.path.join(root_path, external_data_folder)
if not os.path.exists(full_path):
# create new external data folder
os.makedirs(full_path)
file_path = os.path.join(external_data_folder, file_path)
replace_existing_external = True
dataset.store_as_external_file(
file_path,
replace_existing_external=replace_existing_external,
check_data=check_data,
)
[docs]
def set_all_data_internal(self, check_data=True):
"""Sets the block's list and array data to be stored internally,
check_data determines if data error checking is enabled during this
process.
Parameters
----------
check_data : bool
Whether to do data error checking.
"""
for key, dataset in self.datasets.items():
if (
isinstance(dataset, mfdataarray.MFArray)
or (
isinstance(dataset, mfdatalist.MFList)
and dataset.structure.type == DatumType.recarray
)
and dataset.enabled
):
dataset.store_internal(check_data=check_data)
def _find_repeating_datasets(self):
repeating_datasets = []
for key, dataset in self.datasets.items():
if dataset.repeating:
repeating_datasets.append(dataset)
return repeating_datasets
def _write_block(self, fd, block_header, ext_file_action):
transient_key = None
if len(block_header.data_items) > 0:
transient_key = block_header.get_transient_key()
# gather data sets to write
data_set_output = []
data_found = False
for key, dataset in self.datasets.items():
try:
if transient_key is None:
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
f" writing data {dataset.structure.name}..."
)
data_set_output.append(
dataset.get_file_entry(ext_file_action=ext_file_action)
)
data_found = True
else:
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
" writing data {} ({}).."
".".format(dataset.structure.name, transient_key)
)
if dataset.repeating:
output = dataset.get_file_entry(
transient_key, ext_file_action=ext_file_action
)
if output is not None:
data_set_output.append(output)
data_found = True
else:
data_set_output.append(
dataset.get_file_entry(
ext_file_action=ext_file_action
)
)
data_found = True
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self._container_package.model_name,
package=self._container_package._get_pname(),
message=(
"Error occurred while writing data "
f'"{dataset.structure.name}" in block '
f'"{self.structure.name}" to file "{fd.name}"'
),
)
if not data_found:
return
# write block header
block_header.write_header(fd)
if self.external_file_name is not None:
# write block contents to external file
indent_string = self._simulation_data.indent_string
fd.write(f"{indent_string}open/close {self.external_file_name}\n")
fd_main = fd
fd_path = os.path.split(os.path.realpath(fd.name))[0]
try:
fd = open(os.path.join(fd_path, self.external_file_name), "w")
except:
type_, value_, traceback_ = sys.exc_info()
message = (
f'Error reading external file "{self.external_file_name}"'
)
raise MFDataException(
self._container_package.model_name,
self._container_package._get_pname(),
self.path,
"reading external file",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
# write data sets
for output in data_set_output:
fd.write(output)
# write trailing comments
pth = block_header.blk_trailing_comment_path
if pth in self._simulation_data.mfdata:
self._simulation_data.mfdata[pth].write(fd)
if self.external_file_name is not None:
# switch back writing to package file
fd.close()
fd = fd_main
# write block footer
block_header.write_footer(fd)
# write post block comments
pth = block_header.blk_post_comment_path
if pth in self._simulation_data.mfdata:
self._simulation_data.mfdata[pth].write(fd)
# write extra line if comments are off
if not self._simulation_data.comments_on:
fd.write("\n")
[docs]
def is_allowed(self):
"""Determine if block is valid based on the values of dependant
MODFLOW variables."""
if self.structure.variable_dependant_path:
# fill in empty part of the path with the current path
if len(self.structure.variable_dependant_path) == 3:
dependant_var_path = (
self.path[0],
) + self.structure.variable_dependant_path
elif len(self.structure.variable_dependant_path) == 2:
dependant_var_path = (
self.path[0],
self.path[1],
) + self.structure.variable_dependant_path
elif len(self.structure.variable_dependant_path) == 1:
dependant_var_path = (
self.path[0],
self.path[1],
self.path[2],
) + self.structure.variable_dependant_path
else:
dependant_var_path = None
# get dependency
dependant_var = None
mf_data = self._simulation_data.mfdata
if dependant_var_path in mf_data:
dependant_var = mf_data[dependant_var_path]
# resolve dependency
if self.structure.variable_value_when_active[0] == "Exists":
exists = self.structure.variable_value_when_active[1]
if dependant_var and exists.lower() == "true":
return True
elif not dependant_var and exists.lower() == "false":
return True
else:
return False
elif not dependant_var:
return False
elif self.structure.variable_value_when_active[0] == ">":
min_val = self.structure.variable_value_when_active[1]
if dependant_var > float(min_val):
return True
else:
return False
elif self.structure.variable_value_when_active[0] == "<":
max_val = self.structure.variable_value_when_active[1]
if dependant_var < float(max_val):
return True
else:
return False
return True
[docs]
def is_valid(self):
"""Returns true of the block is valid."""
# check data sets
for dataset in self.datasets.values():
# Non-optional datasets must be enabled
if not dataset.structure.optional and not dataset.enabled:
return False
# Enabled blocks must be valid
if dataset.enabled and not dataset.is_valid:
return False
# check variables
for block_header in self.block_headers:
for dataset in block_header.data_items:
# Non-optional datasets must be enabled
if not dataset.structure.optional and not dataset.enabled:
return False
# Enabled blocks must be valid
if dataset.enabled and not dataset.is_valid():
return False
[docs]
class MFPackage(PackageContainer, PackageInterface):
"""
Provides an interface for the user to specify data to build a package.
Parameters
----------
parent : MFModel, MFSimulation, or MFPackage
The parent model, simulation, or package containing this package
package_type : str
String defining the package type
filename : str or PathLike
Name or path of file where this package is stored
quoted_filename : str
Filename with quotes around it when there is a space in the name
pname : str
Package name
loading_package : bool
Whether or not to add this package to the parent container's package
list during initialization
Attributes
----------
blocks : dict
Dictionary of blocks contained in this package by block name
path : tuple
Data dictionary path to this package
structure : PackageStructure
Describes the blocks and data contain in this package
dimensions : PackageDimension
Resolves data dimensions for data within this package
"""
def __init__(
self,
parent,
package_type,
filename=None,
pname=None,
loading_package=False,
**kwargs,
):
parent_file = kwargs.pop("parent_file", None)
if isinstance(parent, MFPackage):
self.model_or_sim = parent.model_or_sim
self.parent_file = parent
elif parent_file is not None:
self.model_or_sim = parent
self.parent_file = parent_file
else:
self.model_or_sim = parent
self.parent_file = None
_internal_package = kwargs.pop("_internal_package", False)
if _internal_package:
self.internal_package = True
else:
self.internal_package = False
self._data_list = []
self._package_type = package_type
if self.model_or_sim.type == "Model" and package_type.lower() != "nam":
self.model_name = self.model_or_sim.name
else:
self.model_name = None
# a package must have a dfn_file_name
if not hasattr(self, "dfn_file_name"):
self.dfn_file_name = ""
if (
self.model_or_sim.type != "Model"
and self.model_or_sim.type != "Simulation"
):
message = (
"Invalid model_or_sim parameter. Expecting either a "
'model or a simulation. Instead type "{}" was '
"given.".format(type(self.model_or_sim))
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.model_name,
pname,
"",
"initializing package",
None,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self.model_or_sim.simulation_data.debug,
)
super().__init__(self.model_or_sim.simulation_data, self.model_name)
self._simulation_data = self.model_or_sim.simulation_data
self.blocks = {}
self.container_type = []
self.loading_package = loading_package
if pname is not None:
if not isinstance(pname, str):
message = (
"Invalid pname parameter. Expecting type str. "
'Instead type "{}" was '
"given.".format(type(pname))
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.model_name,
pname,
"",
"initializing package",
None,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self.model_or_sim.simulation_data.debug,
)
self.package_name = pname.lower()
else:
self.package_name = None
if filename is None:
if self.model_or_sim.type == "Simulation":
# filename uses simulation base name
base_name = os.path.basename(
os.path.normpath(self.model_or_sim.name)
)
self._filename = f"{base_name}.{package_type}"
else:
# filename uses model base name
self._filename = f"{self.model_or_sim.name}.{package_type}"
else:
if not isinstance(filename, (str, os.PathLike)):
message = (
"Invalid fname parameter. Expecting type str. "
'Instead type "{}" was '
"given.".format(type(filename))
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.model_name,
pname,
"",
"initializing package",
None,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self.model_or_sim.simulation_data.debug,
)
self._filename = datautil.clean_filename(
str(filename).replace("\\", "/")
)
self.path, self.structure = self.model_or_sim.register_package(
self, not loading_package, pname is None, filename is None
)
self.dimensions = self.create_package_dimensions()
if self.path is None:
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"WARNING: Package type {} failed to register property."
" {}".format(self._package_type, self.path)
)
if self.parent_file is not None:
self.container_type.append(PackageContainerType.package)
# init variables that may be used later
self.post_block_comments = None
self.last_error = None
self.bc_color = "black"
self.__inattr = False
self._child_package_groups = {}
child_builder_call = kwargs.pop("child_builder_call", None)
if (
self.parent_file is not None
and child_builder_call is None
and package_type in self.parent_file._child_package_groups
):
# initialize as part of the parent's child package group
chld_pkg_grp = self.parent_file._child_package_groups[package_type]
chld_pkg_grp.init_package(self, self._filename, False)
# remove any remaining valid kwargs
key_list = list(kwargs.keys())
for key in key_list:
if "filerecord" in key and hasattr(self, f"{key}"):
kwargs.pop(f"{key}")
# check for extraneous kwargs
if len(kwargs) > 0:
kwargs_str = ", ".join(kwargs.keys())
excpt_str = (
f'Extraneous kwargs "{kwargs_str}" provided to MFPackage.'
)
raise FlopyException(excpt_str)
def __init_subclass__(cls):
"""Register package type"""
super().__init_subclass__()
PackageContainer.modflow_packages.append(cls)
PackageContainer.packages_by_abbr[cls.package_abbr] = cls
def __setattr__(self, name, value):
if hasattr(self, name) and getattr(self, name) is not None:
attribute = object.__getattribute__(self, name)
if attribute is not None and isinstance(attribute, mfdata.MFData):
try:
if isinstance(attribute, mfdatalist.MFList):
attribute.set_data(value, autofill=True)
else:
attribute.set_data(value)
except MFDataException as mfde:
raise MFDataException(
mfdata_except=mfde,
model=self.model_name,
package=self._get_pname(),
)
return
if all(
hasattr(self, attr) for attr in ["model_or_sim", "_package_type"]
):
if hasattr(self.model_or_sim, "_mg_resync"):
if not self.model_or_sim._mg_resync:
self.model_or_sim._mg_resync = self._mg_resync
super().__setattr__(name, value)
def __repr__(self):
return self._get_data_str(True)
def __str__(self):
return self._get_data_str(False)
@property
def filename(self):
"""Package's file name."""
return self._filename
@property
def quoted_filename(self):
"""Package's file name with quotes if there is a space."""
if " " in self._filename:
return f'"{self._filename}"'
return self._filename
@filename.setter
def filename(self, fname):
"""Package's file name."""
if (
isinstance(self.parent_file, MFPackage)
and self.package_type in self.parent_file._child_package_groups
):
fname = datautil.clean_filename(fname)
try:
child_pkg_group = self.parent_file._child_package_groups[
self.structure.file_type
]
child_pkg_group._update_filename(self._filename, fname)
except Exception:
print(
"WARNING: Unable to update file name for parent"
f"package of {self.package_name}."
)
if self.model_or_sim is not None and fname is not None:
if self._package_type != "nam":
self.model_or_sim.update_package_filename(self, fname)
self._filename = fname
@property
def package_type(self):
"""String describing type of package"""
return self._package_type
@property
def name(self):
"""Name of package"""
return [self.package_name]
@name.setter
def name(self, name):
"""Name of package"""
self.package_name = name
@property
def parent(self):
"""Parent package"""
return self.model_or_sim
@parent.setter
def parent(self, parent):
"""Parent package"""
assert False, "Do not use this setter to set the parent"
@property
def plottable(self):
"""If package is plottable"""
if self.model_or_sim.type == "Simulation":
return False
else:
return True
@property
def output(self):
"""
Method to get output associated with a specific package
Returns
-------
MF6Output object
"""
return MF6Output(self)
@property
def data_list(self):
"""List of data in this package."""
# return [data_object, data_object, ...]
return self._data_list
def _add_package(self, package, path):
pkg_type = package.package_type.lower()
if pkg_type in self.package_type_dict:
for existing_pkg in self.package_type_dict[pkg_type]:
if existing_pkg is package:
# do not add the same package twice
return
super()._add_package(package, path)
def _get_aux_data(self, aux_names):
if hasattr(self, "stress_period_data"):
spd = self.stress_period_data.get_data()
if (
0 in spd
and spd[0] is not None
and aux_names[0][1] in spd[0].dtype.names
):
return spd
if hasattr(self, "packagedata"):
pd = self.packagedata.get_data()
if aux_names[0][1] in pd.dtype.names:
return pd
if hasattr(self, "perioddata"):
spd = self.perioddata.get_data()
if (
0 in spd
and spd[0] is not None
and aux_names[0][1] in spd[0].dtype.names
):
return spd
if hasattr(self, "aux"):
return self.aux.get_data()
return None
def _boundnames_active(self):
if hasattr(self, "boundnames"):
if self.boundnames.get_data():
return True
return False
[docs]
def check(self, f=None, verbose=True, level=1, checktype=None):
"""Data check, returns True on success."""
if checktype is None:
checktype = mf6check
# do general checks
chk = super().check(f, verbose, level, checktype)
# do mf6 specific checks
if hasattr(self, "auxiliary"):
# auxiliary variable check
# check if auxiliary variables are defined
aux_names = self.auxiliary.get_data()
if aux_names is not None and len(aux_names[0]) > 1:
num_aux_names = len(aux_names[0]) - 1
# check for stress period data
aux_data = self._get_aux_data(aux_names)
if aux_data is not None and len(aux_data) > 0:
# make sure the check object exists
if chk is None:
chk = self._get_check(f, verbose, level, checktype)
if isinstance(aux_data, dict):
aux_datasets = list(aux_data.values())
else:
aux_datasets = [aux_data]
dataset_type = "unknown"
for dataset in aux_datasets:
if isinstance(dataset, np.recarray):
dataset_type = "recarray"
break
elif isinstance(dataset, np.ndarray):
dataset_type = "ndarray"
break
# if aux data is in a list
if dataset_type == "recarray":
# check for time series data
time_series_name_dict = {}
if hasattr(self, "ts") and hasattr(
self.ts, "time_series_namerecord"
):
# build dictionary of time series data variables
ts_nr = self.ts.time_series_namerecord.get_data()
if ts_nr is not None:
for item in ts_nr:
if len(item) > 0 and item[0] is not None:
time_series_name_dict[item[0]] = True
# auxiliary variables are last unless boundnames
# defined, then second to last
if self._boundnames_active():
offset = 1
else:
offset = 0
# loop through stress period datasets with aux data
for data in aux_datasets:
if isinstance(data, np.recarray):
for row in data:
row_size = len(row)
aux_start_loc = (
row_size - num_aux_names - offset
)
# loop through auxiliary variables
for idx, var in enumerate(aux_names):
# get index of current aux variable
data_index = aux_start_loc + idx
# verify auxiliary value is either
# numeric or time series variable
if (
not datautil.DatumUtil.is_float(
row[data_index]
)
and not row[data_index]
in time_series_name_dict
):
desc = (
f"Invalid non-numeric "
f"value "
f"'{row[data_index]}' "
f"in auxiliary data."
)
chk._add_to_summary(
"Error",
desc=desc,
package=self.package_name,
)
# else if stress period data is arrays
elif dataset_type == "ndarray":
# loop through auxiliary stress period datasets
for data in aux_datasets:
# verify auxiliary value is either numeric or time
# array series variable
if isinstance(data, np.ndarray):
val = np.isnan(np.sum(data))
if val:
desc = (
f"One or more nan values were "
f"found in auxiliary data."
)
chk._add_to_summary(
"Warning",
desc=desc,
package=self.package_name,
)
return chk
def _get_nan_exclusion_list(self):
excl_list = []
if hasattr(self, "stress_period_data"):
spd_struct = self.stress_period_data.structure
for item_struct in spd_struct.data_item_structures:
if item_struct.optional or item_struct.keystring_dict:
excl_list.append(item_struct.name)
return excl_list
def _get_data_str(self, formal, show_data=True):
data_str = (
"package_name = {}\nfilename = {}\npackage_type = {}"
"\nmodel_or_simulation_package = {}"
"\n{}_name = {}"
"\n".format(
self._get_pname(),
self._filename,
self.package_type,
self.model_or_sim.type.lower(),
self.model_or_sim.type.lower(),
self.model_or_sim.name,
)
)
if self.parent_file is not None and formal:
data_str = (
f"{data_str}parent_file = {self.parent_file._get_pname()}\n\n"
)
else:
data_str = f"{data_str}\n"
if show_data:
for block in self.blocks.values():
if formal:
bl_repr = repr(block)
if len(bl_repr.strip()) > 0:
data_str = (
"{}Block {}\n--------------------\n{}"
"\n".format(
data_str, block.structure.name, repr(block)
)
)
else:
bl_str = str(block)
if len(bl_str.strip()) > 0:
data_str = (
"{}Block {}\n--------------------\n{}"
"\n".format(
data_str, block.structure.name, str(block)
)
)
return data_str
def _get_pname(self):
if self.package_name is not None:
return str(self.package_name)
else:
return str(self._filename)
def _get_block_header_info(self, line, path):
# init
header_variable_strs = []
arr_clean_line = line.strip().split()
header_comment = MFComment(
"", path + (arr_clean_line[1],), self._simulation_data, 0
)
# break header into components
if len(arr_clean_line) < 2:
message = (
"Block header does not contain a name. Name "
'expected in line "{}".'.format(line)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.model_name,
self._get_pname(),
self.path,
"parsing block header",
None,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
elif len(arr_clean_line) == 2:
return MFBlockHeader(
arr_clean_line[1],
header_variable_strs,
header_comment,
self._simulation_data,
path,
)
else:
# process text after block name
comment = False
for entry in arr_clean_line[2:]:
# if start of comment
if MFComment.is_comment(entry.strip()[0]):
comment = True
if comment:
header_comment.text = " ".join(
[header_comment.text, entry]
)
else:
header_variable_strs.append(entry)
return MFBlockHeader(
arr_clean_line[1],
header_variable_strs,
header_comment,
self._simulation_data,
path,
)
def _update_size_defs(self):
# build temporary data lookup by name
data_lookup = {}
for block in self.blocks.values():
for dataset in block.datasets.values():
data_lookup[dataset.structure.name] = dataset
# loop through all data
for block in self.blocks.values():
for dataset in block.datasets.values():
# if data shape is 1-D
if (
dataset.structure.shape
and len(dataset.structure.shape) == 1
):
# if shape name is data in this package
if dataset.structure.shape[0] in data_lookup:
size_def = data_lookup[dataset.structure.shape[0]]
size_def_name = size_def.structure.name
if isinstance(dataset, mfdata.MFTransient):
# for transient data always use the maximum size
new_size = -1
for key in dataset.get_active_key_list():
try:
data = dataset.get_data(key=key[0])
except (OSError, MFDataException):
# TODO: Handle case where external file
# path has been moved
data = None
if data is not None:
data_len = len(data)
if data_len > new_size:
new_size = data_len
else:
# for all other data set max to size
new_size = -1
try:
data = dataset.get_data()
except (OSError, MFDataException):
# TODO: Handle case where external file
# path has been moved
data = None
if data is not None:
new_size = len(dataset.get_data())
if size_def.get_data() is None:
current_size = 0
else:
current_size = size_def.get_data()
if new_size > current_size:
# store current size
size_def.set_data(new_size)
# informational message to the user
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"INFORMATION: {} in {} changed to {} "
"based on size of {}".format(
size_def_name,
size_def.structure.path[:-1],
new_size,
dataset.structure.name,
)
)
[docs]
def inspect_cells(self, cell_list, stress_period=None):
"""
Inspect model cells. Returns package data associated with cells.
Parameters
----------
cell_list : list of tuples
List of model cells. Each model cell is a tuple of integers.
ex: [(1,1,1), (2,4,3)]
stress_period : int
For transient data, only return data from this stress period. If
not specified or None, all stress period data will be returned.
Returns
-------
output : array
Array containing inspection results
"""
data_found = []
# loop through blocks
local_index_names = []
local_index_blocks = []
local_index_values = []
local_index_cellids = []
# loop through blocks in package
for block in self.blocks.values():
# loop through data in block
for dataset in block.datasets.values():
if isinstance(dataset, mfdatalist.MFList):
# handle list data
cellid_column = None
local_index_name = None
# loop through list data column definitions
for index, data_item in enumerate(
dataset.structure.data_item_structures
):
if index == 0 and data_item.type == DatumType.integer:
local_index_name = data_item.name
# look for cellid column in list data row
if isinstance(data_item, MFDataItemStructure) and (
data_item.is_cellid or data_item.possible_cellid
):
cellid_column = index
break
if cellid_column is not None:
data_output = DataSearchOutput(dataset.path)
local_index_vals = []
local_index_cells = []
# get data
if isinstance(dataset, mfdatalist.MFTransientList):
# data may be in multiple transient blocks, get
# data from appropriate blocks
main_data = dataset.get_data(stress_period)
if stress_period is not None:
main_data = {stress_period: main_data}
else:
# data is all in one block, get data
main_data = {-1: dataset.get_data()}
# loop through each dataset
for key, value in main_data.items():
if value is None:
continue
if data_output.data_header is None:
data_output.data_header = value.dtype.names
# loop through list data rows
for line in value:
# loop through list of cells we are searching
# for
for cell in cell_list:
if isinstance(
line[cellid_column], tuple
) and cellids_equal(
line[cellid_column], cell
):
# save data found
data_output.data_entries.append(line)
data_output.data_entry_ids.append(cell)
data_output.data_entry_stress_period.append(
key
)
if datautil.DatumUtil.is_int(line[0]):
# save index data for further
# processing. assuming index is
# always first entry
local_index_vals.append(line[0])
local_index_cells.append(cell)
if (
local_index_name is not None
and len(local_index_vals) > 0
):
# capture index lookups for scanning related data
local_index_names.append(local_index_name)
local_index_blocks.append(block.path[-1])
local_index_values.append(local_index_vals)
local_index_cellids.append(local_index_cells)
if len(data_output.data_entries) > 0:
data_found.append(data_output)
elif isinstance(dataset, mfdataarray.MFArray):
# handle array data
data_shape = copy.deepcopy(
dataset.structure.data_item_structures[0].shape
)
if dataset.path[-1] == "top":
# top is a special case where the two datasets
# need to be combined to get the correct layer top
model_grid = self.model_or_sim.modelgrid
main_data = {-1: model_grid.top_botm}
data_shape.append("nlay")
else:
if isinstance(dataset, mfdataarray.MFTransientArray):
# data may be in multiple blocks, get data from
# appropriate blocks
main_data = dataset.get_data(stress_period)
if stress_period is not None:
main_data = {stress_period: main_data}
else:
# data is all in one block, get a process data
main_data = {-1: dataset.get_data()}
if main_data is None:
continue
data_output = DataSearchOutput(dataset.path)
# loop through datasets
for key, array_data in main_data.items():
if array_data is None:
continue
self.model_or_sim.match_array_cells(
cell_list, data_shape, array_data, key, data_output
)
if len(data_output.data_entries) > 0:
data_found.append(data_output)
if len(local_index_names) > 0:
# look for data that shares the index value with data found
# for example a shared well or reach number
for block in self.blocks.values():
# loop through data
for dataset in block.datasets.values():
if isinstance(dataset, mfdatalist.MFList):
data_item = dataset.structure.data_item_structures[0]
data_output = DataSearchOutput(dataset.path)
# loop through previous data found
for (
local_index_name,
local_index_vals,
cell_ids,
local_block_name,
) in zip(
local_index_names,
local_index_values,
local_index_cellids,
local_index_blocks,
):
if local_block_name == block.path[-1]:
continue
if (
isinstance(data_item, MFDataItemStructure)
and data_item.name == local_index_name
and data_item.type == DatumType.integer
):
# matching data index type found, get data
if isinstance(
dataset, mfdatalist.MFTransientList
):
# data may be in multiple blocks, get data
# from appropriate blocks
main_data = dataset.get_data(stress_period)
if stress_period is not None:
main_data = {stress_period: main_data}
else:
# data is all in one block
main_data = {-1: dataset.get_data()}
# loop through the data
for key, value in main_data.items():
if value is None:
continue
if data_output.data_header is None:
data_output.data_header = (
value.dtype.names
)
# loop through each row of data
for line in value:
# loop through the index values we are
# looking for
for index_val, cell_id in zip(
local_index_vals, cell_ids
):
# try to match index values we are
# looking for to the data
if index_val == line[0]:
# save data found
data_output.data_entries.append(
line
)
data_output.data_entry_ids.append(
index_val
)
data_output.data_entry_cellids.append(
cell_id
)
data_output.data_entry_stress_period.append(
key
)
if len(data_output.data_entries) > 0:
data_found.append(data_output)
return data_found
[docs]
def remove(self):
"""Removes this package from the simulation/model it is currently a
part of.
"""
self.model_or_sim.remove_package(self)
[docs]
def build_child_packages_container(self, pkg_type, filerecord):
"""Builds a container object for any child packages. This method is
only intended for FloPy internal use."""
# get package class
package_obj = self.package_factory(
pkg_type, self.model_or_sim.model_type
)
# create child package object
child_pkgs_name = f"utl{pkg_type}packages"
child_pkgs_obj = self.package_factory(child_pkgs_name, "")
if child_pkgs_obj is None and self.model_or_sim.model_type is None:
# simulation level object, try just the package type in the name
child_pkgs_name = f"{pkg_type}packages"
child_pkgs_obj = self.package_factory(child_pkgs_name, "")
if child_pkgs_obj is None:
# see if the package is part of one of the supported model types
for model_type in MFStructure().sim_struct.model_types:
child_pkgs_name = f"{model_type}{pkg_type}packages"
child_pkgs_obj = self.package_factory(child_pkgs_name, "")
if child_pkgs_obj is not None:
break
child_pkgs = child_pkgs_obj(
self.model_or_sim, self, pkg_type, filerecord, None, package_obj
)
setattr(self, pkg_type, child_pkgs)
self._child_package_groups[pkg_type] = child_pkgs
def _get_dfn_name_dict(self):
dfn_name_dict = {}
item_num = 0
for item in self.structure.dfn_list:
if len(item) > 1:
item_name = item[1].split()
if len(item_name) > 1 and item_name[0] == "name":
dfn_name_dict[item_name[1]] = item_num
item_num += 1
return dfn_name_dict
[docs]
def build_child_package(self, pkg_type, data, parameter_name, filerecord):
"""Builds a child package. This method is only intended for FloPy
internal use."""
if not hasattr(self, pkg_type):
self.build_child_packages_container(pkg_type, filerecord)
if data is not None:
package_group = getattr(self, pkg_type)
# build child package file name
child_path = package_group._next_default_file_path()
# create new empty child package
package_obj = self.package_factory(
pkg_type, self.model_or_sim.model_type
)
package = package_obj(
self, filename=child_path, child_builder_call=True
)
assert hasattr(package, parameter_name)
if isinstance(data, dict):
# order data correctly
dfn_name_dict = package._get_dfn_name_dict()
ordered_data_items = []
for key, value in data.items():
if key in dfn_name_dict:
ordered_data_items.append(
[dfn_name_dict[key], key, value]
)
else:
ordered_data_items.append([999999, key, value])
ordered_data_items = sorted(
ordered_data_items, key=lambda x: x[0]
)
# evaluate and add data to package
unused_data = {}
for order, key, value in ordered_data_items:
# if key is an attribute of the child package
if isinstance(key, str) and hasattr(package, key):
# set child package attribute
child_data_attr = getattr(package, key)
if isinstance(child_data_attr, mfdatalist.MFList):
child_data_attr.set_data(value, autofill=True)
elif isinstance(child_data_attr, mfdata.MFData):
child_data_attr.set_data(value)
elif key == "fname" or key == "filename":
child_path = value
package._filename = value
else:
setattr(package, key, value)
else:
unused_data[key] = value
if unused_data:
setattr(package, parameter_name, unused_data)
else:
setattr(package, parameter_name, data)
# append package to list
package_group.init_package(package, child_path)
return package
[docs]
def build_mfdata(self, var_name, data=None):
"""Returns the appropriate data type object (mfdatalist, mfdataarray,
or mfdatascalar) given that object the appropriate structure (looked
up based on var_name) and any data supplied. This method is for
internal FloPy library use only.
Parameters
----------
var_name : str
Variable name
data : many supported types
Data contained in this object
Returns
-------
data object : MFData subclass
"""
if self.loading_package:
data = None
for key, block in self.structure.blocks.items():
if var_name in block.data_structures:
if block.name not in self.blocks:
self.blocks[block.name] = MFBlock(
self._simulation_data,
self.dimensions,
block,
self.path + (key,),
self.model_or_sim,
self,
)
dataset_struct = block.data_structures[var_name]
var_path = self.path + (key, var_name)
ds = self.blocks[block.name].add_dataset(
dataset_struct, data, var_path
)
self._data_list.append(ds)
return ds
message = 'Unable to find variable "{}" in package ' '"{}".'.format(
var_name, self.package_type
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.model_name,
self._get_pname(),
self.path,
"building data objects",
None,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
[docs]
def set_model_relative_path(self, model_ws):
"""Sets the model path relative to the simulation's path.
Parameters
----------
model_ws : str
Model path relative to the simulation's path.
"""
# update blocks
for key, block in self.blocks.items():
block.set_model_relative_path(model_ws)
# update sub-packages
for package in self._packagelist:
package.set_model_relative_path(model_ws)
[docs]
def set_all_data_external(
self, check_data=True, external_data_folder=None
):
"""Sets the package's list and array data to be stored externally.
Parameters
----------
check_data : bool
Determine if data error checking is enabled
external_data_folder
Folder where external data will be stored
"""
# set blocks
for key, block in self.blocks.items():
file_name = os.path.split(self.filename)[1]
block.set_all_data_external(
file_name, check_data, external_data_folder
)
# set sub-packages
for package in self._packagelist:
package.set_all_data_external(check_data, external_data_folder)
[docs]
def set_all_data_internal(self, check_data=True):
"""Sets the package's list and array data to be stored internally.
Parameters
----------
check_data : bool
Determine if data error checking is enabled
"""
# set blocks
for key, block in self.blocks.items():
block.set_all_data_internal(check_data)
# set sub-packages
for package in self._packagelist:
package.set_all_data_internal(check_data)
[docs]
def load(self, strict=True):
"""Loads the package from file.
Parameters
----------
strict : bool
Enforce strict checking of data.
Returns
-------
success : bool
"""
# open file
try:
fd_input_file = open(
datautil.clean_filename(self.get_file_path()), "r"
)
except OSError as e:
if e.errno == errno.ENOENT:
message = "File {} of type {} could not be opened.".format(
self.get_file_path(), self.package_type
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.model_name,
self.package_name,
self.path,
"loading package file",
None,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
try:
self._load_blocks(fd_input_file, strict)
except ReadAsArraysException as err:
fd_input_file.close()
raise ReadAsArraysException(err)
# close file
fd_input_file.close()
if self.simulation_data.auto_set_sizes:
self._update_size_defs()
# return validity of file
return self.is_valid()
[docs]
def is_valid(self):
"""Returns whether or not this package is valid.
Returns
-------
is valid : bool
"""
# Check blocks
for block in self.blocks.values():
# Non-optional blocks must be enabled
if (
block.structure.number_non_optional_data() > 0
and not block.enabled
and block.is_allowed()
):
self.last_error = (
f'Required block "{block.block_header.name}" not enabled'
)
return False
# Enabled blocks must be valid
if block.enabled and not block.is_valid:
self.last_error = f'Invalid block "{block.block_header.name}"'
return False
return True
def _load_blocks(self, fd_input_file, strict=True, max_blocks=sys.maxsize):
# init
self._simulation_data.mfdata[
self.path + ("pkg_hdr_comments",)
] = MFComment("", self.path, self._simulation_data)
self.post_block_comments = MFComment(
"", self.path, self._simulation_data
)
blocks_read = 0
found_first_block = False
line = " "
while line != "":
line = fd_input_file.readline()
clean_line = line.strip()
# If comment or empty line
if MFComment.is_comment(clean_line, True):
self._store_comment(line, found_first_block)
elif len(clean_line) > 4 and clean_line[:5].upper() == "BEGIN":
# parse block header
try:
block_header_info = self._get_block_header_info(
line, self.path
)
except MFDataException as mfde:
message = (
"An error occurred while loading block header "
'in line "{}".'.format(line)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.model_name,
self._get_pname(),
self.path,
"loading block header",
None,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
mfde,
)
# if there is more than one possible block with the same name,
# resolve the correct block to use
block_key = block_header_info.name.lower()
block_num = 1
possible_key = f"{block_header_info.name.lower()}-{block_num}"
if possible_key in self.blocks:
block_key = possible_key
block_header_name = block_header_info.name.lower()
while (
block_key in self.blocks
and not self.blocks[block_key].is_allowed()
):
block_key = f"{block_header_name}-{block_num}"
block_num += 1
if block_key not in self.blocks:
# block name not recognized, load block as comments and
# issue a warning
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
warning_str = (
'WARNING: Block "{}" is not a valid block '
"name for file type "
"{}.".format(block_key, self.package_type)
)
print(warning_str)
self._store_comment(line, found_first_block)
while line != "":
line = fd_input_file.readline()
self._store_comment(line, found_first_block)
arr_line = datautil.PyListUtil.split_data_line(line)
if arr_line and (
len(arr_line[0]) <= 2
or arr_line[0][:3].upper() == "END"
):
break
else:
found_first_block = True
skip_block = False
cur_block = self.blocks[block_key]
if cur_block.loaded:
# Only blocks defined as repeating are allowed to have
# multiple entries
header_name = block_header_info.name
if not self.structure.blocks[
header_name.lower()
].repeating():
# warn and skip block
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
warning_str = (
'WARNING: Block "{}" has '
"multiple entries and is not "
"intended to be a repeating "
"block ({} package"
")".format(header_name, self.package_type)
)
print(warning_str)
skip_block = True
bhs = cur_block.structure.block_header_structure
bhval = block_header_info.variable_strings
if (
len(bhs) > 0
and len(bhval) > 0
and bhs[0].name == "iper"
):
nper = self._simulation_data.mfdata[
("tdis", "dimensions", "nper")
].get_data()
bhval_int = datautil.DatumUtil.is_int(bhval[0])
if not bhval_int or int(bhval[0]) > nper:
# skip block when block stress period is greater
# than nper
skip_block = True
if not skip_block:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
f" loading block {cur_block.structure.name}..."
)
# reset comments
self.post_block_comments = MFComment(
"", self.path, self._simulation_data
)
cur_block.load(
block_header_info, fd_input_file, strict
)
# write post block comment comment
self._simulation_data.mfdata[
cur_block.block_headers[-1].blk_post_comment_path
] = self.post_block_comments
blocks_read += 1
if blocks_read >= max_blocks:
break
else:
# treat skipped block as if it is all comments
arr_line = datautil.PyListUtil.split_data_line(
clean_line
)
self.post_block_comments.add_text(str(line), True)
while arr_line and (
len(line) <= 2 or arr_line[0][:3].upper() != "END"
):
line = fd_input_file.readline()
arr_line = datautil.PyListUtil.split_data_line(
line.strip()
)
if arr_line:
self.post_block_comments.add_text(
str(line), True
)
self._simulation_data.mfdata[
cur_block.block_headers[-1].blk_post_comment_path
] = self.post_block_comments
else:
if not (
len(clean_line) == 0
or (len(line) > 2 and line[:3].upper() == "END")
):
# Record file location of beginning of unresolved text
# treat unresolved text as a comment for now
self._store_comment(line, found_first_block)
[docs]
def write(self, ext_file_action=ExtFileAction.copy_relative_paths):
"""Writes the package to a file.
Parameters
----------
ext_file_action : ExtFileAction
How to handle pathing of external data files.
"""
if self.simulation_data.auto_set_sizes:
self._update_size_defs()
# create any folders in path
package_file_path = self.get_file_path()
package_folder = os.path.split(package_file_path)[0]
if package_folder and not os.path.isdir(package_folder):
os.makedirs(os.path.split(package_file_path)[0])
# open file
fd = open(package_file_path, "w")
# write flopy header
if self.simulation_data.write_headers:
dt = datetime.datetime.now()
header = (
"# File generated by Flopy version {} on {} at {}."
"\n".format(
__version__,
dt.strftime("%m/%d/%Y"),
dt.strftime("%H:%M:%S"),
)
)
fd.write(header)
# write blocks
self._write_blocks(fd, ext_file_action)
fd.close()
[docs]
def create_package_dimensions(self):
"""Creates a package dimensions object. For internal FloPy library
use.
Returns
-------
package dimensions : PackageDimensions
"""
model_dims = None
if self.container_type[0] == PackageContainerType.model:
model_dims = [
modeldimensions.ModelDimensions(
self.path[0], self._simulation_data
)
]
else:
# this is a simulation file that does not correspond to a specific
# model. figure out which model to use and return a dimensions
# object for that model
if self.dfn_file_name[0:3] == "exg":
exchange_rec_array = self._simulation_data.mfdata[
("nam", "exchanges", "exchanges")
].get_data()
if exchange_rec_array is None:
return None
for exchange in exchange_rec_array:
if exchange[1].lower() == self._filename.lower():
model_dims = [
modeldimensions.ModelDimensions(
exchange[2], self._simulation_data
),
modeldimensions.ModelDimensions(
exchange[3], self._simulation_data
),
]
break
elif (
self.dfn_file_name[4:7] == "gnc"
and self.model_or_sim.type == "Simulation"
):
# get exchange file name associated with gnc package
if self.parent_file is not None:
exg_file_name = self.parent_file.filename
else:
raise Exception(
"Can not create a simulation-level "
"gnc file without a corresponding "
"exchange file. Exchange file must be "
"created first."
)
# get models associated with exchange file from sim nam file
try:
exchange_recarray_data = (
self.model_or_sim.name_file.exchanges.get_data()
)
except MFDataException as mfde:
message = (
"An error occurred while retrieving exchange "
"data from the simulation name file. The error "
"occurred while processing gnc file "
f'"{self.filename}".'
)
raise MFDataException(
mfdata_except=mfde,
package=self._get_pname(),
message=message,
)
assert exchange_recarray_data is not None
model_1 = None
model_2 = None
for exchange in exchange_recarray_data:
if exchange[1] == exg_file_name:
model_1 = exchange[2]
model_2 = exchange[3]
# assign models to gnc package
model_dims = [
modeldimensions.ModelDimensions(
model_1, self._simulation_data
),
modeldimensions.ModelDimensions(
model_2, self._simulation_data
),
]
elif self.parent_file is not None:
model_dims = []
for md in self.parent_file.dimensions.model_dim:
model_name = md.model_name
model_dims.append(
modeldimensions.ModelDimensions(
model_name, self._simulation_data
)
)
else:
model_dims = [
modeldimensions.ModelDimensions(
None, self._simulation_data
)
]
return modeldimensions.PackageDimensions(
model_dims, self.structure, self.path
)
def _store_comment(self, line, found_first_block):
# Store comment
if found_first_block:
self.post_block_comments.text += line
else:
self._simulation_data.mfdata[
self.path + ("pkg_hdr_comments",)
].text += line
def _write_blocks(self, fd, ext_file_action):
# verify that all blocks are valid
if not self.is_valid():
message = (
'Unable to write out model file "{}" due to the '
"following error: "
"{} ({})".format(self._filename, self.last_error, self.path)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.model_name,
self._get_pname(),
self.path,
"writing package blocks",
None,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
# write initial comments
pkg_hdr_comments_path = self.path + ("pkg_hdr_comments",)
if pkg_hdr_comments_path in self._simulation_data.mfdata:
self._simulation_data.mfdata[
self.path + ("pkg_hdr_comments",)
].write(fd, False)
# loop through blocks
block_num = 1
for block in self.blocks.values():
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(f" writing block {block.structure.name}...")
# write block
block.write(fd, ext_file_action=ext_file_action)
block_num += 1
[docs]
def get_file_path(self):
"""Returns the package file's path.
Returns
-------
file path : str
"""
if self.path[0] in self._simulation_data.mfpath.model_relative_path:
return os.path.join(
self._simulation_data.mfpath.get_model_path(self.path[0]),
self._filename,
)
else:
return os.path.join(
self._simulation_data.mfpath.get_sim_path(), self._filename
)
[docs]
def export(self, f, **kwargs):
"""
Method to export a package to netcdf or shapefile based on the
extension of the file name (.shp for shapefile, .nc for netcdf)
Parameters
----------
f : str
Filename
kwargs : keyword arguments
modelgrid : flopy.discretization.Grid instance
User supplied modelgrid which can be used for exporting
in lieu of the modelgrid associated with the model object
Returns
-------
None or Netcdf object
"""
from .. import export
return export.utils.package_export(f, self, **kwargs)
[docs]
def plot(self, **kwargs):
"""
Plot 2-D, 3-D, transient 2-D, and stress period list (MfList)
package input data
Parameters
----------
**kwargs : dict
filename_base : str
Base file name that will be used to automatically generate
file names for output image files. Plots will be exported as
image files if file_name_base is not None. (default is None)
file_extension : str
Valid matplotlib.pyplot file extension for savefig(). Only
used if filename_base is not None. (default is 'png')
mflay : int
MODFLOW zero-based layer number to return. If None, then all
all layers will be included. (default is None)
kper : int
MODFLOW zero-based stress period number to return. (default is
zero)
key : str
MfList dictionary key. (default is None)
Returns
----------
axes : list
Empty list is returned if filename_base is not None. Otherwise
a list of matplotlib.pyplot.axis are returned.
"""
from ..plot.plotutil import PlotUtilities
if not self.plottable:
raise TypeError("Simulation level packages are not plottable")
axes = PlotUtilities._plot_package_helper(self, **kwargs)
return axes
[docs]
class MFChildPackages:
"""
Behind the scenes code for creating an interface to access child packages
from a parent package. This class is automatically constructed by the
FloPy library and is for internal library use only.
Parameters
----------
"""
def __init__(
self,
model_or_sim,
parent,
pkg_type,
filerecord,
package=None,
package_class=None,
):
self._packages = []
self._filerecord = filerecord
if package is not None:
self._packages.append(package)
self._model_or_sim = model_or_sim
self._cpparent = parent
self._pkg_type = pkg_type
self._package_class = package_class
def __init_subclass__(cls):
"""Register package"""
super().__init_subclass__()
PackageContainer.modflow_packages.append(cls)
PackageContainer.packages_by_abbr[cls.package_abbr] = cls
def __getattr__(self, attr):
if (
"_packages" in self.__dict__
and len(self._packages) > 0
and hasattr(self._packages[0], attr)
):
item = getattr(self._packages[0], attr)
return item
raise AttributeError(attr)
def __getitem__(self, k):
if isinstance(k, int):
if k < len(self._packages):
return self._packages[k]
raise ValueError(f"Package index {k} does not exist.")
def __setattr__(self, key, value):
if (
key != "_packages"
and key != "_model_or_sim"
and key != "_cpparent"
and key != "_inattr"
and key != "_filerecord"
and key != "_package_class"
and key != "_pkg_type"
):
if len(self._packages) == 0:
raise Exception(
"No {} package is currently attached to package"
" {}. Use the initialize method to create a(n) "
"{} package before attempting to access its "
"properties.".format(
self._pkg_type, self._cpparent.filename, self._pkg_type
)
)
package = self._packages[0]
setattr(package, key, value)
return
super().__setattr__(key, value)
def __default_file_path_base(self, file_path, suffix=""):
stem = os.path.split(file_path)[1]
stem_lst = stem.split(".")
file_name = ".".join(stem_lst[:-1])
if len(stem_lst) > 1:
file_ext = stem_lst[-1]
return f"{file_name}.{file_ext}{suffix}.{self._pkg_type}"
elif suffix != "":
return f"{stem}.{self._pkg_type}"
else:
return f"{stem}.{suffix}.{self._pkg_type}"
def __file_path_taken(self, possible_path):
for package in self._packages:
# Do case insensitive compare
if package.filename.lower() == possible_path.lower():
return True
return False
def _next_default_file_path(self):
possible_path = self.__default_file_path_base(self._cpparent.filename)
suffix = 0
while self.__file_path_taken(possible_path):
possible_path = self.__default_file_path_base(
self._cpparent.filename, suffix
)
suffix += 1
return possible_path
[docs]
def init_package(self, package, fname, remove_packages=True):
if remove_packages:
# clear out existing packages
self._remove_packages()
elif fname is not None:
self._remove_packages(fname)
if fname is None:
# build a file name
fname = self._next_default_file_path()
package._filename = fname
# check file record variable
found = False
fr_data = self._filerecord.get_data()
if fr_data is not None:
for line in fr_data:
if line[0] == fname:
found = True
if not found:
# append file record variable
self._filerecord.append_data([(fname,)])
# add the package to the list
self._packages.append(package)
def _update_filename(self, old_fname, new_fname):
file_record = self._filerecord.get_data()
new_file_record_data = []
if file_record is not None:
file_record_data = file_record[0]
for item in file_record_data:
base, fname = os.path.split(item)
if fname.lower() == old_fname.lower():
if base:
new_file_record_data.append(
(os.path.join(base, new_fname),)
)
else:
new_file_record_data.append((new_fname,))
else:
new_file_record_data.append((item,))
else:
new_file_record_data.append((new_fname,))
self._filerecord.set_data(new_file_record_data)
def _append_package(self, package, fname, update_frecord=True):
if fname is None:
# build a file name
fname = self._next_default_file_path()
package._filename = fname
if update_frecord:
# set file record variable
file_record = self._filerecord.get_data()
file_record_data = file_record
new_file_record_data = []
for item in file_record_data:
new_file_record_data.append((item[0],))
new_file_record_data.append((fname,))
self._filerecord.set_data(new_file_record_data)
for existing_pkg in self._packages:
if existing_pkg is package:
# do not add the same package twice
return
# add the package to the list
self._packages.append(package)
def _remove_packages(self, fname=None):
rp_list = []
for idx, package in enumerate(self._packages):
if fname is None or package.filename == fname:
self._model_or_sim.remove_package(package)
rp_list.append(idx)
for idx in reversed(rp_list):
self._packages.pop(idx)