import inspect
import os
import sys
from copy import deepcopy
from enum import Enum
from shutil import copyfile
import numpy as np
from ...utils.datautil import (
ArrayIndexIter,
DatumUtil,
MultiList,
MultiListIter,
PyListUtil,
clean_filename,
)
from ..data import mfdatautil
from ..data.mfstructure import DatumType, MFDataItemStructure
from ..mfbase import MFDataException, VerbosityLevel
from .mfdatautil import MFComment, convert_data, iterable
from .mffileaccess import MFFileAccess, MFFileAccessArray, MFFileAccessList
[docs]class DataStorageType(Enum):
"""
Enumeration of different ways that data can be stored
"""
internal_array = 1
internal_constant = 2
external_file = 3
[docs]class DataStructureType(Enum):
"""
Enumeration of different data structures used to store data
"""
ndarray = 1
recarray = 2
scalar = 3
[docs]class LayerStorage:
"""
Stores a single layer of data.
Parameters
----------
data_storage : DataStorage
Parent data storage object that layer is contained in
lay_num : int
Layer number of layered being stored
data_storage_type : DataStorageType
Method used to store the data
Attributes
----------
internal_data : ndarray or recarray
data being stored, if full data is being stored internally in memory
data_const_value : int/float
constant value of data being stored, if data is a constant
data_storage_type : DataStorageType
method used to store the data
fname : str
file name of external file containing the data
factor : int/float
factor to multiply the data by
iprn : int
print code
binary : bool
whether the data is stored in a binary file
Methods
-------
get_const_val(layer)
gets the constant value of a given layer. data storage type for layer
must be "internal_constant".
get_data(layer) : ndarray/recarray/string
returns the data for the specified layer
set_data(data, layer=None, multiplier=[1.0]
sets the data being stored to "data" for layer "layer", replacing all
data for that layer. a multiplier can be specified.
See Also
--------
Notes
-----
Examples
--------
"""
def __init__(
self,
data_storage,
lay_indexes,
data_storage_type=DataStorageType.internal_array,
data_type=None,
):
self._data_storage_parent = data_storage
self._lay_indexes = lay_indexes
self.internal_data = None
self.data_const_value = None
self.data_storage_type = data_storage_type
self.data_type = data_type
self.fname = None
if self.data_type == DatumType.integer:
self.factor = 1
else:
self.factor = 1.0
self.iprn = None
self.binary = False
[docs] def set_internal_constant(self):
self.data_storage_type = DataStorageType.internal_constant
[docs] def set_internal_array(self):
self.data_storage_type = DataStorageType.internal_array
@property
def name(self):
return self._data_storage_parent.data_dimensions.structure.name
def __repr__(self):
if self.data_storage_type == DataStorageType.internal_constant:
return f"constant {self.get_data_const_val()}"
else:
return repr(self.get_data())
def __str__(self):
if self.data_storage_type == DataStorageType.internal_constant:
return str(self.get_data_const_val())
else:
return str(self.get_data())
def __getattr__(self, attr):
if attr == "binary" or not hasattr(self, "binary"):
raise AttributeError(attr)
if attr == "array":
return self._data_storage_parent.get_data(self._lay_indexes, True)
elif attr == "__getstate__":
raise AttributeError(attr)
[docs] def set_data(self, data):
self._data_storage_parent.set_data(
data, self._lay_indexes, [self.factor]
)
[docs] def get_data(self):
return self._data_storage_parent.get_data(self._lay_indexes, False)
[docs] def get_data_const_val(self):
if isinstance(self.data_const_value, list):
return self.data_const_value[0]
else:
return self.data_const_value
[docs]class DataStorage:
"""
Stores and retrieves data.
Parameters
----------
sim_data : simulation data class
reference to the simulation data class
data_dimensions : data dimensions class
a data dimensions class for the data being stored
get_file_entry : method reference
method that returns the file entry for the stored data
data_storage_type : enum
how the data will be stored (internally, as a constant, as an external
file)
data_structure_type : enum
what internal type is the data stored in (ndarray, recarray, scalar)
layer_shape : int
number of data layers
layered : bool
is the data layered
layer_storage : MultiList<LayerStorage>
one or more dimensional list of LayerStorage
Attributes
----------
data_storage_type : list
list of data storage types, one for each layer
data_const_value : list
list of data constants, one for each layer
external_file_path : list
list of external file paths, one for each layer
multiplier : list
list of multipliers, one for each layer
print_format : list
list of print formats, one for each layer
data_structure_type :
what internal type is the data stored in (ndarray, recarray, scalar)
layered : bool
is the data layered
pre_data_comments : string
any comments before the start of the data
comments : dict
any comments mixed in with the data, dictionary keys are data lines
post_data_comments : string
any comments after the end of the data
Methods
-------
override_data_type : (index, data_type)
overrides the data type used in a recarray at index "index" with data
type "data_type"
get_external_file_path(layer)
gets the path to an external file for layer "layer"
get_const_val(layer)
gets the constant value of a given layer. data storage type for layer
must be "internal_constant".
has_data(layer) : bool
returns true if data exists for the specified layer, false otherwise
get_data(layer) : ndarray/recarray/string
returns the data for the specified layer
update_item(data, key_index)
updates the data in a recarray at index "key_index" with data "data".
data is a list containing all data for a single record in the
recarray. . data structure type must be recarray
append_data(data)
appends data "data" to the end of a recarray. data structure type
must be recarray
set_data(data, layer=None, multiplier=[1.0]
sets the data being stored to "data" for layer "layer", replacing all
data for that layer. a multiplier can be specified.
get_active_layer_indices() : list
returns the indices of all layers expected to contain data
store_internal(data, layer=None, const=False, multiplier=[1.0])
store data "data" at layer "layer" internally
store_external(file_path, layer=None, multiplier=[1.0], print_format=None,
data=None, do_not_verify=False) store data "data" at layer "layer"
externally in file "file_path"
external_to_external(new_external_file, multiplier=None, layer=None)
copies existing external data to the new file location and points to
the new file
external_to_internal(layer_num=None, store_internal=False) :
ndarray/recarray
loads existing external data for layer "layer_num" and returns it. if
store_internal is True it also storages the data internally,
changing the storage type for "layer_num" layer to internal.
internal_to_external(new_external_file, multiplier=None, layer=None,
print_format=None)
stores existing internal data for layer "layer" to external file
"new_external_file"
read_data_from_file(layer, fd=None, multiplier=None) : (ndarray, int)
reads in data from a given file "fd" as data from layer "layer".
returns data as an ndarray along with the size of the data
to_string(val, type, is_cellid=False, possible_cellid=False)
converts data "val" of type "type" to a string. is_cellid is True if
the data type is known to be a cellid and is treated as such. when
possible_cellid is True the data is checked to see if it matches the
shape/dimensions of a cellid before using it as one.
resolve_data_size(index) : int
resolves the size of a given data element in a recarray based on the
names in the existing rec_array. assumes repeating data element
names follow the format <data_element_name>_X. returns the number of
times the data element repeats.
flatten()
converts layered data to a non-layered data
make_layered()
converts non-layered data to layered data
See Also
--------
Notes
-----
Examples
--------
"""
def __init__(
self,
sim_data,
model_or_sim,
data_dimensions,
get_file_entry,
data_storage_type=DataStorageType.internal_array,
data_structure_type=DataStructureType.ndarray,
layer_shape=(1,),
layered=False,
stress_period=0,
data_path=(),
):
self.data_dimensions = data_dimensions
self._model_or_sim = model_or_sim
self._simulation_data = sim_data
self._get_file_entry = get_file_entry
self._data_type_overrides = {}
self._data_storage_type = data_storage_type
self._stress_period = stress_period
self._data_path = data_path
if not data_structure_type == DataStructureType.recarray:
self._data_type = self.data_dimensions.structure.get_datum_type(
return_enum_type=True
)
else:
self._data_type = None
self.layer_storage = MultiList(
shape=layer_shape, callback=self._create_layer
)
# self.layer_storage = [LayerStorage(self, x, data_storage_type)
# for x in range(layer_shape)]
self.data_structure_type = data_structure_type
package_dim = self.data_dimensions.package_dim
self.in_model = (
self.data_dimensions is not None
and len(package_dim.package_path) > 1
and package_dim.model_dim[0].model_name.lower()
== package_dim.package_path[0]
)
if data_structure_type == DataStructureType.recarray:
self.build_type_list(resolve_data_shape=False)
self.layered = layered
# initialize comments
self.pre_data_comments = None
self.comments = {}
def __repr__(self):
return self.get_data_str(True)
def __str__(self):
return self.get_data_str(False)
def _create_layer(self, indexes):
return LayerStorage(
self, indexes, self._data_storage_type, self._data_type
)
[docs] def flatten(self):
self.layered = False
storage_type = self.layer_storage.first_item().data_storage_type
self.layer_storage = MultiList(
mdlist=[LayerStorage(self, 0, storage_type, self._data_type)]
)
[docs] def make_layered(self):
if not self.layered:
if self.data_structure_type != DataStructureType.ndarray:
message = (
'Data structure type "{}" does not support '
"layered data.".format(self.data_structure_type)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"making data layered",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if (
self.layer_storage.first_item().data_storage_type
== DataStorageType.external_file
):
message = (
"Converting external file data into layered "
"data currently not support."
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"making data layered",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
previous_storage = self.layer_storage.first_item()
data = previous_storage.get_data()
data_dim = self.get_data_dimensions(None)
self.layer_storage = MultiList(
shape=(data_dim[0],), callback=self._create_layer
)
if (
previous_storage.data_storage_type
== DataStorageType.internal_constant
):
for storage in self.layer_storage.elements():
storage.data_const_value = (
previous_storage.data_const_value
)
elif (
previous_storage.data_storage_type
== DataStorageType.internal_array
):
data_ml = MultiList(data)
if not (
data_ml.get_total_size()
== self.layer_storage.get_total_size()
):
message = (
"Size of data ({}) does not match expected "
"value of {}"
".".format(
data_ml.get_total_size(),
self.layer_storage.get_total_size(),
)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"making data layered",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
for data_layer, storage in zip(
data, self.layer_storage.elements()
):
storage.internal_data = data_layer
storage.factor = previous_storage.factor
storage.iprn = previous_storage.iprn
self.layered = True
[docs] def get_data_str(self, formal):
data_str = ""
# Assemble strings for internal array data
for index, storage in enumerate(self.layer_storage.elements()):
if self.layered:
layer_str = f"Layer_{index + 1}"
else:
layer_str = ""
if storage.data_storage_type == DataStorageType.internal_array:
if storage.internal_data is not None:
header = self._get_layer_header_str(index)
if formal:
if self.layered:
data_str = "{}{}{{{}}}\n({})\n".format(
data_str,
layer_str,
header,
repr(self.get_data((index,))),
)
else:
data_str = "{}{}{{{}}}\n({})\n".format(
data_str,
layer_str,
header,
repr(self.get_data((index,))),
)
else:
data_str = "{}{}{{{}}}\n({})\n".format(
data_str,
layer_str,
header,
str(self.get_data((index,))),
)
elif (
storage.data_storage_type == DataStorageType.internal_constant
):
if formal:
if storage.data_const_value is not None:
data_str = "{}{}{{{}}}\n".format(
data_str,
layer_str,
self._get_layer_header_str(index),
)
else:
if storage.data_const_value is not None:
data_str = "{}{}{{{}}}\n".format(
data_str,
layer_str,
self._get_layer_header_str(index),
)
return data_str
def _get_layer_header_str(self, layer):
header_list = []
if (
self.layer_storage[layer].data_storage_type
== DataStorageType.external_file
):
header_list.append(f"open/close {self.layer_storage[layer].fname}")
elif (
self.layer_storage[layer].data_storage_type
== DataStorageType.internal_constant
):
lr = self.layer_storage[layer]
header_list.append(f"constant {lr}")
else:
header_list.append("internal")
if (
self.layer_storage[layer].factor != 1.0
and self.layer_storage[layer].factor != 1
and self.data_structure_type != DataStructureType.recarray
):
header_list.append(f"factor {self.layer_storage[layer].factor}")
if self.layer_storage[layer].iprn is not None:
header_list.append(f"iprn {self.layer_storage[layer].iprn}")
if len(header_list) > 0:
return ", ".join(header_list)
else:
return ""
[docs] def init_layers(self, dimensions):
self.layer_storage = MultiList(
shape=dimensions, callback=self._create_layer
)
[docs] def add_layer(self, dimension=2):
self.layer_storage.increment_dimension(dimension, self._create_layer)
[docs] def override_data_type(self, index, data_type):
self._data_type_overrides[index] = data_type
[docs] def get_external_file_path(self, layer):
if layer is None:
return self.layer_storage[0].fname
else:
return self.layer_storage[layer].fname
[docs] def get_const_val(self, layer=None):
if layer is None:
if not self.layer_storage.get_total_size() >= 1:
message = "Can not get constant value. No data is available."
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"getting constant value",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
first_item = self.layer_storage.first_item()
if (
not first_item.data_storage_type
== DataStorageType.internal_constant
):
message = (
"Can not get constant value. Storage type must be "
"internal_constant."
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"getting constant value",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
return first_item.get_data_const_val()
else:
if not self.layer_storage.in_shape(layer):
message = (
'Can not get constant value. Layer "{}" is not a '
"valid layer.".format(layer)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"getting constant value",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if (
not self.layer_storage[layer].data_storage_type
== DataStorageType.internal_constant
):
message = (
"Can not get constant value. Storage type must be "
"internal_constant."
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"getting constant value",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
return self.layer_storage[layer].get_data_const_val()
[docs] def has_data(self, layer=None):
ret_val = self._access_data(layer, False)
return ret_val is not None and ret_val is not False
[docs] def get_data(self, layer=None, apply_mult=True, block_exists=False):
data = self._access_data(layer, True, apply_mult=apply_mult)
if data is None and block_exists:
return []
else:
return data
def _access_data(self, layer, return_data=False, apply_mult=True):
layer_check = self._resolve_layer(layer)
if (
self.layer_storage[layer_check].internal_data is None
and self.layer_storage[layer_check].data_storage_type
== DataStorageType.internal_array
) or (
self.layer_storage[layer_check].data_const_value is None
and self.layer_storage[layer_check].data_storage_type
== DataStorageType.internal_constant
):
return None
if (
layer is None
and (
self.data_structure_type == DataStructureType.ndarray
or self.data_structure_type == DataStructureType.scalar
)
and return_data
):
# return data from all layers
data = self._build_full_data(apply_mult)
if data is None:
if (
self.layer_storage.first_item().data_storage_type
== DataStorageType.internal_constant
):
return self.layer_storage.first_item().get_data()[0]
else:
return data
if (
self.layer_storage[layer_check].data_storage_type
== DataStorageType.external_file
):
if return_data:
return self.external_to_internal(layer)
else:
return True
else:
if (
self.data_structure_type == DataStructureType.ndarray
and self.layer_storage[layer_check].data_const_value is None
and self.layer_storage[layer_check].internal_data is None
):
return None
if not (layer is None or self.layer_storage.in_shape(layer)):
message = f'Layer "{layer}" is an invalid layer.'
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"accessing data",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if layer is None:
if (
self.data_structure_type == DataStructureType.ndarray
or self.data_structure_type == DataStructureType.scalar
):
if self.data_structure_type == DataStructureType.scalar:
return (
self.layer_storage.first_item().internal_data
is not None
)
check_storage = self.layer_storage[layer_check]
return (
check_storage.data_const_value is not None
and check_storage.data_storage_type
== DataStorageType.internal_constant
) or (
check_storage.internal_data is not None
and check_storage.data_storage_type
== DataStorageType.internal_array
)
else:
if (
self.layer_storage[layer_check].data_storage_type
== DataStorageType.internal_constant
):
if return_data:
# recarray stored as a constant. currently only
# support grid-based constant recarrays. build
# a recarray of all cells
data_list = []
model_grid = self.data_dimensions.get_model_grid()
structure = self.data_dimensions.structure
package_dim = self.data_dimensions.package_dim
for cellid in model_grid.get_all_model_cells():
first_item = self.layer_storage.first_item()
data_line = (cellid,) + (
first_item.data_const_value,
)
if len(structure.data_item_structures) > 2:
# append None any expected optional data
for (
data_item_struct
) in structure.data_item_structures[2:]:
if (
data_item_struct.name
!= "boundname"
or package_dim.boundnames()
):
data_line = data_line + (None,)
data_list.append(data_line)
type_list = self.resolve_typelist(data_list)
return np.rec.array(data_list, type_list)
else:
return (
self.layer_storage[
layer_check
].data_const_value
is not None
)
else:
if return_data:
return (
self.layer_storage.first_item().internal_data
)
else:
return True
elif (
self.layer_storage[layer].data_storage_type
== DataStorageType.internal_array
):
if return_data:
return self.layer_storage[layer].internal_data
else:
return self.layer_storage[layer].internal_data is not None
elif (
self.layer_storage[layer].data_storage_type
== DataStorageType.internal_constant
):
layer_storage = self.layer_storage[layer]
if return_data:
data = self._fill_const_layer(layer)
if data is None:
if (
layer_storage.data_storage_type
== DataStructureType.internal_constant
):
return layer_storage.data_const_value[0]
else:
return data
else:
return layer_storage.data_const_value is not None
else:
if return_data:
return self.get_external(layer)
else:
return True
[docs] def append_data(self, data):
# currently only support appending to recarrays
if not (self.data_structure_type == DataStructureType.recarray):
message = (
'Can not append to data structure "{}". Can only '
"append to a recarray datastructure"
".".format(self.data_structure_type)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"appending data",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
internal_data = self.layer_storage.first_item().internal_data
if internal_data is None:
if len(data[0]) != len(self._recarray_type_list):
# rebuild type list using existing data as a guide
self.build_type_list(data=data)
self.set_data(np.rec.array(data, self._recarray_type_list))
else:
if len(self.layer_storage.first_item().internal_data[0]) < len(
data[0]
):
# Rebuild recarray to fit larger size
count = 0
last_count = len(data[0]) - len(internal_data[0])
while count < last_count:
self._duplicate_last_item()
count += 1
internal_data_list = internal_data.tolist()
for data_item in data:
internal_data_list.append(data_item)
self._add_placeholders(internal_data_list)
self.set_data(
np.rec.array(internal_data_list, self._recarray_type_list)
)
else:
first_item = self.layer_storage.first_item()
if len(first_item.internal_data[0]) > len(data[0]):
# Add placeholders to data
self._add_placeholders(data)
self.set_data(
np.hstack(
(
internal_data,
np.rec.array(data, self._recarray_type_list),
)
)
)
[docs] def set_data(
self,
data,
layer=None,
multiplier=None,
key=None,
autofill=False,
check_data=False,
):
if multiplier is None:
multiplier = [1.0]
if (
self.data_structure_type == DataStructureType.recarray
or self.data_structure_type == DataStructureType.scalar
):
self._set_list(data, layer, multiplier, key, autofill, check_data)
else:
self._set_array(data, layer, multiplier, key, autofill)
def _set_list(
self, data, layer, multiplier, key, autofill, check_data=False
):
if isinstance(data, dict):
if "filename" in data:
if "binary" in data and data["binary"]:
if self.data_dimensions.package_dim.boundnames():
message = (
"Unable to store list data ({}) to a binary "
"file when using boundnames"
".".format(self.data_dimensions.structure.name)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"writing list data to binary file",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
self.process_open_close_line(data, layer)
return
elif "data" in data:
data = data["data"]
if isinstance(data, list):
if len(data) > 0 and (
not PyListUtil.is_iterable(data[0]) or isinstance(data[0], str)
):
# single line of data needs to be encapsulated in a tuple
data = [tuple(data)]
self.store_internal(
data,
layer,
False,
multiplier,
key=key,
autofill=autofill,
check_data=check_data,
)
def _set_array(self, data, layer, multiplier, key, autofill):
# make a list out of a single item
if (
isinstance(data, int)
or isinstance(data, float)
or isinstance(data, str)
):
data = [data]
# check for possibility of multi-layered data
success = False
layer_num = 0
if (
layer is None
and self.data_structure_type == DataStructureType.ndarray
and len(data) == self.layer_storage.get_total_size()
and not isinstance(data, dict)
):
# loop through list and try to store each list entry as a layer
success = True
for layer_num, layer_data in enumerate(data):
if (
not isinstance(layer_data, list)
and not isinstance(layer_data, dict)
and not isinstance(layer_data, np.ndarray)
):
layer_data = [layer_data]
layer_index = self.layer_storage.nth_index(layer_num)
success = success and self._set_array_layer(
layer_data, layer_index, multiplier, key
)
if not success:
# try to store as a single layer
success = self._set_array_layer(data, layer, multiplier, key)
self.layered = bool(self.layer_storage.get_total_size() > 1)
if not success:
message = (
'Unable to set data "{}" layer {}. Data is not '
"in a valid format"
".".format(self.data_dimensions.structure.name, layer_num)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"setting array data",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
def _set_array_layer(self, data, layer, multiplier, key):
# look for a single constant value
data_type = self.data_dimensions.structure.get_datum_type(
return_enum_type=True
)
if not isinstance(data, dict) and not isinstance(data, str):
if self._calc_data_size(data, 2) == 1 and self._is_type(
data[0], data_type
):
# store data as const
self.store_internal(data, layer, True, multiplier, key=key)
return True
# look for internal and open/close data
if isinstance(data, dict):
if "data" in data:
if (
isinstance(data["data"], int)
or isinstance(data["data"], float)
or isinstance(data["data"], str)
):
# data should always in in a list/array
data["data"] = [data["data"]]
if "filename" in data:
multiplier, iprn, binary = self.process_open_close_line(
data, layer
)[0:3]
# store location to file
self.store_external(
data["filename"],
layer,
[multiplier],
print_format=iprn,
binary=binary,
do_not_verify=True,
)
return True
elif "data" in data:
multiplier, iprn = self.process_internal_line(data)
if len(data["data"]) == 1 and (
DatumUtil.is_float(data["data"][0])
or DatumUtil.is_int(data["data"][0])
):
# merge multiplier with single value and make constant
if DatumUtil.is_float(multiplier):
mult = 1.0
else:
mult = 1
self.store_internal(
[data["data"][0] * multiplier],
layer,
True,
[mult],
key=key,
print_format=iprn,
)
else:
self.store_internal(
data["data"],
layer,
False,
[multiplier],
key=key,
print_format=iprn,
)
return True
elif isinstance(data[0], str):
if data[0].lower() == "internal":
multiplier, iprn = self.process_internal_line(data)
self.store_internal(
data[-1],
layer,
False,
[multiplier],
key=key,
print_format=iprn,
)
return True
elif data[0].lower() != "open/close":
# assume open/close is just omitted
new_data = data[:]
new_data.insert(0, "open/close")
else:
new_data = data[:]
self.process_open_close_line(new_data, layer, True)
return True
# try to resolve as internal array
layer_storage = self.layer_storage[self._resolve_layer(layer)]
if not (
layer_storage.data_storage_type
== DataStorageType.internal_constant
and PyListUtil.has_one_item(data)
):
# store data as is
try:
self.store_internal(data, layer, False, multiplier, key=key)
except MFDataException:
return False
return True
return False
[docs] def get_active_layer_indices(self):
layer_index = []
for index in self.layer_storage.indexes():
if (
self.layer_storage[index].fname is not None
or self.layer_storage[index].internal_data is not None
):
layer_index.append(index)
return layer_index
[docs] def get_external(self, layer=None):
if not (layer is None or self.layer_storage.in_shape(layer)):
message = f'Can not get external data for layer "{layer}".'
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"getting external data",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
[docs] def store_internal(
self,
data,
layer=None,
const=False,
multiplier=None,
key=None,
autofill=False,
print_format=None,
check_data=False,
):
if multiplier is None:
multiplier = [self.get_default_mult()]
if self.data_structure_type == DataStructureType.recarray:
if (
self.layer_storage.first_item().data_storage_type
== DataStorageType.internal_constant
):
self.layer_storage.first_item().data_const_value = data
else:
self.layer_storage.first_item().data_storage_type = (
DataStorageType.internal_array
)
if data is None or isinstance(data, np.recarray):
if self._simulation_data.verify_data and check_data:
self._verify_list(data)
self.layer_storage.first_item().internal_data = data
else:
if data is None:
self.set_data(None)
self.build_type_list()
if isinstance(data, list):
# look for single strings in list that describe
# multiple items
new_data = []
for item in data:
if isinstance(item, str):
# parse possible multi-item string
new_data.append(
self._resolve_data_line(item, key)
)
else:
new_data.append(item)
data = new_data
if isinstance(data, str):
# parse possible multi-item string
data = [self._resolve_data_line(data, key)]
if (
data is not None
and check_data
and self._simulation_data.verify_data
):
# check data line length
self._check_list_length(data)
if isinstance(data, np.recarray):
self.layer_storage.first_item().internal_data = data
elif autofill and data is not None:
if isinstance(data, tuple) and isinstance(
data[0], tuple
):
# convert to list of tuples
data = list(data)
if isinstance(data, list) and DatumUtil.is_basic_type(
data[0]
):
# this is a simple list, turn it into a tuple
# inside a list so that it is interpreted
# correctly by numpy.recarray
tupled_data = ()
for data_item in data:
tupled_data += (data_item,)
data = [tupled_data]
if not isinstance(data, list):
# put data in a list format for recarray
data = [(data,)]
# auto-fill tagged keyword
structure = self.data_dimensions.structure
data_item_structs = structure.data_item_structures
if (
data_item_structs[0].tagged
and not data_item_structs[0].type
== DatumType.keyword
):
for data_index, data_entry in enumerate(data):
if (
isinstance(data_entry[0], str)
and data_entry[0].lower()
== data_item_structs[0].name.lower()
):
break
data[data_index] = (
data_item_structs[0].name.lower(),
) + data[data_index]
if data is not None:
new_data = self._build_recarray(data, key, autofill)
self.layer_storage.first_item().internal_data = (
new_data
)
elif self.data_structure_type == DataStructureType.scalar:
if data == [()]:
data = [(True,)]
self.layer_storage.first_item().internal_data = data
else:
layer, multiplier = self._store_prep(layer, multiplier)
dimensions = self.get_data_dimensions(layer)
if const:
self.layer_storage[
layer
].data_storage_type = DataStorageType.internal_constant
self.layer_storage[layer].data_const_value = [
mfdatautil.get_first_val(data)
]
else:
self.layer_storage[
layer
].data_storage_type = DataStorageType.internal_array
try:
self.layer_storage[layer].internal_data = np.reshape(
data, dimensions
)
except:
message = (
'An error occurred when reshaping data "{}" to store. '
"Expected data dimensions: {}".format(
self.data_dimensions.structure.name, dimensions
)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"setting array data",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
self.layer_storage[layer].factor = multiplier
self.layer_storage[layer].iprn = print_format
def _resolve_data_line(self, data, key):
if len(self._recarray_type_list) > 1:
# add any missing leading keywords to the beginning of the string
data_lst = data.strip().split()
data_lst_updated = []
struct = self.data_dimensions.structure
for data_item_index, data_item in enumerate(
struct.data_item_structures
):
print(data_item)
if data_item.type == DatumType.keyword:
if data_lst[0].lower() != data_item.name.lower():
data_lst_updated.append(data_item.name)
else:
data_lst_updated.append(data_lst.pop(0))
else:
if (
struct.type == DatumType.record
and data_lst[0].lower() != data_item.name.lower()
):
data_lst_updated.append(data_item.name)
break
data_lst_updated += data_lst
# parse the string as if it is being read from a package file
file_access = MFFileAccessList(
self.data_dimensions.structure,
self.data_dimensions,
self._simulation_data,
self._data_path,
self._stress_period,
)
data_loaded = []
data_out = file_access.load_list_line(
self,
data_lst_updated,
0,
data_loaded,
False,
current_key=key,
data_line=data,
zero_based=True,
)[1]
return tuple(data_out)
return data
def _get_min_record_entries(self, data=None):
try:
if isinstance(data, dict) and "data" in data:
data = data["data"]
type_list = self.build_type_list(data=data, min_size=True)
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"getting min record entries",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
None,
self._simulation_data.debug,
ex,
)
return len(type_list)
def _check_line_size(self, data_line, min_line_size):
if 0 < len(data_line) < min_line_size:
message = (
"Data line {} only has {} entries, "
"minimum number of entries is "
"{}.".format(data_line, len(data_line), min_line_size)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"storing data",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
def _check_list_length(self, data_check):
if iterable(data_check):
# verify data length
min_line_size = self._get_min_record_entries(data_check)
if isinstance(data_check[0], np.record) or (
iterable(data_check[0]) and not isinstance(data_check[0], str)
):
# data contains multiple records
for data_line in data_check:
self._check_line_size(data_line, min_line_size)
else:
self._check_line_size(data_check, min_line_size)
def _build_recarray(self, data, key, autofill):
if not mfdatautil.PyListUtil.is_iterable(data) or len(data) == 0:
# set data to build empty recarray
data = [()]
self.build_type_list(data=data, key=key)
if not self.tuple_cellids(data):
# fix data so cellid is a single tuple
data = self.make_tuple_cellids(data)
if autofill and data is not None:
# resolve any fields with data types that do not
# agree with the expected type list
self._resolve_multitype_fields(data)
if isinstance(data, list):
# data needs to be stored as tuples within a list.
# if this is not the case try to fix it
self._tupleize_data(data)
# add placeholders to data so it agrees with
# expected dimensions of recarray
self._add_placeholders(data)
try:
new_data = np.rec.array(data, self._recarray_type_list)
except:
data_expected = []
for data_type in self._recarray_type_list:
data_expected.append(f"<{data_type[0]}>")
message = (
"An error occurred when storing data "
'"{}" in a recarray. {} data is a one '
"or two dimensional list containing "
'the variables "{}" (some variables '
"may be optional, see MF6 "
'documentation), but data "{}" was '
"supplied.".format(
self.data_dimensions.structure.name,
self.data_dimensions.structure.name,
" ".join(data_expected),
data,
)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"setting array data",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if self._simulation_data.verify_data:
self._verify_list(new_data)
return new_data
[docs] def make_tuple_cellids(self, data):
# convert cellids from individual layer, row, column fields into
# tuples (layer, row, column)
data_dim = self.data_dimensions
model_grid = data_dim.get_model_grid()
cellid_size = model_grid.get_num_spatial_coordinates()
new_data = []
current_cellid = ()
for line in data:
new_line = []
for item, is_cellid in zip(line, self.recarray_cellid_list_ex):
if is_cellid:
current_cellid += (item,)
if len(current_cellid) == cellid_size:
new_line.append(current_cellid)
current_cellid = ()
else:
new_line.append(item)
new_data.append(tuple(new_line))
return new_data
[docs] def tuple_cellids(self, data):
for data_entry, cellid in zip(data[0], self.recarray_cellid_list):
if cellid:
if isinstance(data_entry, int):
# cellid is stored in separate columns in the recarray
# (eg: one column for layer one column for row and
# one columne for column)
return False
else:
# cellid is stored in a single column in the recarray
# as a tuple
return True
return True
[docs] def resolve_typelist(self, data):
if self.tuple_cellids(data):
return self._recarray_type_list
else:
return self._recarray_type_list_ex
[docs] def resolve_cellidlist(self, data):
if self.tuple_cellids(data):
return self.recarray_cellid_list
else:
return self.recarray_cellid_list_ex
def _resolve_multitype_fields(self, data):
# find any data fields where the data is not a consistent type
itype_len = len(self._recarray_type_list)
for data_entry in data:
for index, data_val in enumerate(data_entry):
if (
index < itype_len
and self._recarray_type_list[index][1] != object
and not isinstance(
data_val, self._recarray_type_list[index][1]
)
and (
not isinstance(data_val, int)
or self._recarray_type_list[index][1] != float
)
):
# for inconsistent types use generic object type
self._recarray_type_list[index] = (
self._recarray_type_list[index][0],
object,
)
[docs] def store_external(
self,
file_path,
layer=None,
multiplier=None,
print_format=None,
data=None,
do_not_verify=False,
binary=False,
):
if multiplier is None:
multiplier = [self.get_default_mult()]
layer_new, multiplier = self._store_prep(layer, multiplier)
# pathing to external file
data_dim = self.data_dimensions
model_name = data_dim.package_dim.model_dim[0].model_name
fp_relative = file_path
if model_name is not None:
rel_path = self._simulation_data.mfpath.model_relative_path[
model_name
]
if rel_path is not None and len(rel_path) > 0 and rel_path != ".":
# include model relative path in external file path
# only if model relative path is not already in external
# file path i.e. when reading!
fp_rp_l = fp_relative.split(os.path.sep)
rp_l_r = rel_path.split(os.path.sep)[::-1]
for i, rp in enumerate(rp_l_r):
if rp != fp_rp_l[len(rp_l_r) - i - 1]:
fp_relative = os.path.join(rp, fp_relative)
fp = self._simulation_data.mfpath.resolve_path(fp_relative, model_name)
if data is not None:
if self.data_structure_type == DataStructureType.recarray:
# create external file and write file entry to the file
# store data internally first so that a file entry
# can be generated
self.store_internal(
data,
layer_new,
False,
[multiplier],
None,
False,
print_format,
)
if binary:
file_access = MFFileAccessList(
self.data_dimensions.structure,
self.data_dimensions,
self._simulation_data,
self._data_path,
self._stress_period,
)
file_access.write_binary_file(
self.layer_storage.first_item().internal_data,
fp,
self._model_or_sim.modeldiscrit,
precision="double",
)
else:
# make sure folder exists
file_path = os.path.split(fp)[0]
if not os.path.exists(file_path):
os.makedirs(file_path)
# create file
try:
fd = open(fp, "w")
except:
message = (
"Unable to open file {}. Make sure the "
"file is not locked and the folder exists"
".".format(fp)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"opening external file for writing",
data_dim.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
ext_file_entry = self._get_file_entry()
fd.write(ext_file_entry)
fd.close()
# set as external data
self.layer_storage.first_item().internal_data = None
else:
# store data externally in file
data_size = self.get_data_size(layer_new)
data_type = data_dim.structure.data_item_structures[0].type
if self._calc_data_size(data, 2) == 1 and data_size > 1:
# constant data, need to expand
self.layer_storage[layer_new].data_const_value = data
self.layer_storage[
layer_new
].data_storage_type = DataStorageType.internal_constant
data = self._fill_const_layer(layer)
elif isinstance(data, list):
data = self._to_ndarray(data, layer)
if binary:
text = self.data_dimensions.structure.name
file_access = MFFileAccessArray(
self.data_dimensions.structure,
self.data_dimensions,
self._simulation_data,
self._data_path,
self._stress_period,
)
str_layered = self.data_dimensions.structure.layered
file_access.write_binary_file(
data,
fp,
text,
self._model_or_sim.modeldiscrit,
self._model_or_sim.modeltime,
stress_period=self._stress_period,
precision="double",
write_multi_layer=(layer is None and str_layered),
)
else:
file_access = MFFileAccessArray(
self.data_dimensions.structure,
self.data_dimensions,
self._simulation_data,
self._data_path,
self._stress_period,
)
file_access.write_text_file(
data,
fp,
data_type,
data_size,
)
self.layer_storage[layer_new].factor = multiplier
self.layer_storage[layer_new].internal_data = None
self.layer_storage[layer_new].data_const_value = None
else:
if self.data_structure_type == DataStructureType.recarray:
self.layer_storage.first_item().internal_data = None
else:
self.layer_storage[layer_new].factor = multiplier
self.layer_storage[layer_new].internal_data = None
self.set_ext_file_attributes(
layer_new, fp_relative, print_format, binary
)
[docs] def set_ext_file_attributes(self, layer, file_path, print_format, binary):
# point to the external file and set flags
self.layer_storage[layer].fname = file_path
self.layer_storage[layer].iprn = print_format
self.layer_storage[layer].binary = binary
self.layer_storage[
layer
].data_storage_type = DataStorageType.external_file
[docs] def point_to_existing_external_file(self, arr_line, layer):
(
multiplier,
print_format,
binary,
data_file,
) = self.process_open_close_line(arr_line, layer, store=False)
self.set_ext_file_attributes(layer, data_file, print_format, binary)
self.layer_storage[layer].factor = multiplier
[docs] def external_to_external(
self, new_external_file, multiplier=None, layer=None, binary=None
):
# currently only support files containing ndarrays
if not (self.data_structure_type == DataStructureType.ndarray):
message = (
'Can not copy external file of type "{}". Only '
"files containing ndarrays currently supported"
".".format(self.data_structure_type)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"copy external file",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if not (
(layer is None and self.layer_storage.get_total_size() == 1)
or (layer is not None and self.layer_storage.in_shape(layer))
):
if layer is None:
message = (
"When no layer is supplied the data must contain "
"only one layer. Data contains {} layers"
".".format(self.layer_storage.get_total_size())
)
else:
message = f'layer "{layer}" is not a valid layer'
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"copy external file",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
# get data storage
if layer is None:
layer = 1
if self.layer_storage[layer].fname is None:
message = f"No file name exists for layer {layer}."
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"copy external file",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
# copy file to new location
copyfile(self.layer_storage[layer].fname, new_external_file)
# update
if binary is None:
binary = self.layer_storage[layer].binary
self.store_external(
new_external_file,
layer,
[self.layer_storage[layer].factor],
self.layer_storage[layer].iprn,
binary=binary,
)
[docs] def external_to_internal(self, layer, store_internal=False):
# reset comments
self.pre_data_comments = None
self.comments = {}
if layer is None:
layer = 0
# load data from external file
model_name = self.data_dimensions.package_dim.model_dim[0].model_name
read_file = self._simulation_data.mfpath.resolve_path(
self.layer_storage[layer].fname, model_name
)
# currently support files containing ndarrays or recarrays
if self.data_structure_type == DataStructureType.ndarray:
file_access = MFFileAccessArray(
self.data_dimensions.structure,
self.data_dimensions,
self._simulation_data,
self._data_path,
self._stress_period,
)
if self.layer_storage[layer].binary:
data_out = file_access.read_binary_data_from_file(
read_file,
self.get_data_dimensions(layer),
self.get_data_size(layer),
self._data_type,
self._model_or_sim.modeldiscrit,
)[0]
else:
data_out = file_access.read_text_data_from_file(
self.get_data_size(layer),
self._data_type,
self.get_data_dimensions(layer),
layer,
read_file,
)[0]
if self.layer_storage[layer].factor is not None:
data_out = data_out * self.layer_storage[layer].factor
if store_internal:
self.store_internal(data_out, layer)
return data_out
elif self.data_structure_type == DataStructureType.recarray:
file_access = MFFileAccessList(
self.data_dimensions.structure,
self.data_dimensions,
self._simulation_data,
self._data_path,
self._stress_period,
)
if self.layer_storage[layer].binary:
data = file_access.read_binary_data_from_file(
read_file, self._model_or_sim.modeldiscrit
)
data_out = self._build_recarray(data, layer, False)
else:
with open(read_file, "r") as fd_read_file:
data_out = file_access.read_list_data_from_file(
fd_read_file,
self,
self._stress_period,
store_internal=False,
)
if store_internal:
self.store_internal(data_out, layer)
return data_out
else:
path = self.data_dimensions.structure.path
message = (
"Can not convert {} to internal data. External to "
"internal file operations currently only supported "
"for ndarrays.".format(path[-1])
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"opening external file for writing",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
[docs] def internal_to_external(
self,
new_external_file,
multiplier=None,
layer=None,
print_format=None,
binary=False,
):
if layer is None:
layer_item = self.layer_storage.first_item()
else:
layer_item = self.layer_storage[layer]
if layer_item.data_storage_type == DataStorageType.internal_array:
data = layer_item.internal_data
else:
data = self._fill_const_layer(layer)
self.store_external(
new_external_file,
layer,
multiplier,
print_format,
data,
binary=binary,
)
[docs] def resolve_shape_list(
self,
data_item,
repeat_count,
current_key,
data_line,
cellid_size=None,
):
struct = self.data_dimensions.structure
try:
resolved_shape, shape_rule = self.data_dimensions.get_data_shape(
data_item, struct, data_line, repeating_key=current_key
)
except Exception as se:
comment = (
'Unable to resolve shape for data "{}" field "{}"'
".".format(struct.name, data_item.name)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
struct.get_model(),
struct.get_package(),
struct.path,
"loading data list from package file",
struct.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
comment,
self._simulation_data.debug,
se,
)
if cellid_size is not None:
data_item.remove_cellid(resolved_shape, cellid_size)
if len(resolved_shape) == 1:
if repeat_count < resolved_shape[0]:
return True, shape_rule is not None
elif resolved_shape[0] == -9999:
# repeating unknown number of times in 1-D array
return False, True
return False, False
def _validate_cellid(self, arr_line, data_index):
if not self.data_dimensions.structure.model_data:
# not model data so this is not a cell id
return False
if arr_line is None:
return False
model_grid = self.data_dimensions.get_model_grid()
cellid_size = model_grid.get_num_spatial_coordinates()
if cellid_size + data_index > len(arr_line):
return False
for index, dim_size in zip(
range(data_index, cellid_size + data_index),
model_grid.get_model_dim(),
):
if not DatumUtil.is_int(arr_line[index]):
return False
val = int(arr_line[index])
if val <= 0 or val > dim_size:
return False
return True
[docs] def process_internal_line(self, arr_line):
multiplier = self.get_default_mult()
print_format = None
if isinstance(arr_line, list):
index = 1
while index < len(arr_line):
if isinstance(arr_line[index], str):
word = arr_line[index].lower()
if word == "factor" and index + 1 < len(arr_line):
multiplier = convert_data(
arr_line[index + 1],
self.data_dimensions,
self._data_type,
)
index += 2
elif word == "iprn" and index + 1 < len(arr_line):
print_format = arr_line[index + 1]
index += 2
else:
break
else:
break
elif isinstance(arr_line, dict):
for key, value in arr_line.items():
if key.lower() == "factor":
multiplier = convert_data(
value, self.data_dimensions, self._data_type
)
if key.lower() == "iprn":
print_format = value
return multiplier, print_format
[docs] def process_open_close_line(self, arr_line, layer, store=True):
# process open/close line
index = 2
if self._data_type == DatumType.integer:
multiplier = 1
else:
multiplier = 1.0
print_format = None
binary = False
data_file = None
data = None
data_dim = self.data_dimensions
if isinstance(arr_line, list):
if len(arr_line) < 2 and store:
message = (
'Data array "{}" contains a OPEN/CLOSE '
"that is not followed by a file. {}".format(
data_dim.structure.name, data_dim.structure.path
)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"processing open/close line",
data_dim.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
while index < len(arr_line):
if isinstance(arr_line[index], str):
word = arr_line[index].lower()
if word == "factor" and index + 1 < len(arr_line):
try:
multiplier = convert_data(
arr_line[index + 1],
self.data_dimensions,
self._data_type,
)
except Exception as ex:
message = (
"Data array {} contains an OPEN/CLOSE "
"with an invalid multiplier following "
'the "factor" keyword.'
".".format(data_dim.structure.name)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"processing open/close line",
data_dim.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
ex,
)
index += 2
elif word == "iprn" and index + 1 < len(arr_line):
print_format = arr_line[index + 1]
index += 2
elif word == "data" and index + 1 < len(arr_line):
data = arr_line[index + 1]
index += 2
elif word == "binary" or word == "(binary)":
binary = True
index += 1
else:
break
else:
break
# save comments
if index < len(arr_line):
self.layer_storage[layer].comments = MFComment(
" ".join(arr_line[index:]),
self.data_dimensions.structure.path,
self._simulation_data,
layer,
)
if arr_line[0].lower() == "open/close":
data_file = clean_filename(arr_line[1])
else:
data_file = clean_filename(arr_line[0])
elif isinstance(arr_line, dict):
for key, value in arr_line.items():
if key.lower() == "factor":
try:
multiplier = convert_data(
value, self.data_dimensions, self._data_type
)
except Exception as ex:
message = (
"Data array {} contains an OPEN/CLOSE "
"with an invalid factor following the "
'"factor" keyword.'
".".format(data_dim.structure.name)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"processing open/close line",
data_dim.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
ex,
)
if key.lower() == "iprn":
print_format = value
if key.lower() == "binary":
binary = bool(value)
if key.lower() == "data":
data = value
if "filename" in arr_line:
data_file = clean_filename(arr_line["filename"])
if data_file is None:
message = (
"Data array {} contains an OPEN/CLOSE without a "
"fname (file name) specified"
".".format(data_dim.structure.name)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"processing open/close line",
data_dim.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if store:
# store external info
self.store_external(
data_file,
layer,
[multiplier],
print_format,
binary=binary,
data=data,
)
# add to active list of external files
model_name = data_dim.package_dim.model_dim[0].model_name
self._simulation_data.mfpath.add_ext_file(data_file, model_name)
return multiplier, print_format, binary, data_file
@staticmethod
def _tupleize_data(data):
for index, data_line in enumerate(data):
if not isinstance(data_line, tuple):
if isinstance(data_line, list):
data[index] = tuple(data_line)
else:
data[index] = (data_line,)
def _verify_list(self, data):
if data is not None:
model_grid = None
cellid_size = None
for data_line in data:
data_line_len = len(data_line)
for index in range(
0, min(data_line_len, len(self._recarray_type_list))
):
datadim = self.data_dimensions
if (
self._recarray_type_list[index][0] == "cellid"
and datadim.get_model_dim(None).model_name is not None
and data_line[index] is not None
):
# this is a cell id. verify that it contains the
# correct number of integers
if cellid_size is None:
model_grid = datadim.get_model_grid()
cellid_size = (
model_grid.get_num_spatial_coordinates()
)
if (
cellid_size != 1
and len(data_line[index]) != cellid_size
and isinstance(data_line[index], int)
):
message = (
'Cellid "{}" contains {} integer(s). '
"Expected a cellid containing {} "
"integer(s) for grid type"
" {}.".format(
data_line[index],
len(data_line[index]),
cellid_size,
str(model_grid.grid_type()),
)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"verifying cellid",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
def _add_placeholders(self, data):
for idx, data_line in enumerate(data):
data_line_len = len(data_line)
if data_line_len < len(self._recarray_type_list):
for index in range(
data_line_len, len(self._recarray_type_list)
):
if self._recarray_type_list[index][1] == int:
self._recarray_type_list[index] = (
self._recarray_type_list[index][0],
object,
)
data_line += (None,)
elif self._recarray_type_list[index][1] == float:
data_line += (np.nan,)
else:
data_line += (None,)
data[idx] = data_line
elif data_line_len > len(self._recarray_type_list):
for index in range(
len(self._recarray_type_list), data_line_len
):
if data_line[-1] is None:
dl = list(data_line)
del dl[-1]
data_line = tuple(dl)
data[idx] = data_line
def _duplicate_last_item(self):
last_item = self._recarray_type_list[-1]
arr_item_name = last_item[0].split("_")
if DatumUtil.is_int(arr_item_name[-1]):
new_item_num = int(arr_item_name[-1]) + 1
new_item_name = "_".join(arr_item_name[0:-1])
new_item_name = f"{new_item_name}_{new_item_num}"
else:
new_item_name = f"{last_item[0]}_1"
self._recarray_type_list.append((new_item_name, last_item[1]))
def _build_full_data(self, apply_multiplier=False):
if self.data_structure_type == DataStructureType.scalar:
return self.layer_storage.first_item().internal_data
dimensions = self.get_data_dimensions(None)
if dimensions[0] < 0:
# dimensions can not be determined from dfn file, use
# the size of the data provided as the dimensions
dimensions = [self.layer_storage.get_total_size()]
all_none = True
np_data_type = self.data_dimensions.structure.get_datum_type()
full_data = np.full(
dimensions,
np.nan,
self.data_dimensions.structure.get_datum_type(True),
)
is_aux = self.data_dimensions.structure.name == "aux"
if is_aux:
aux_data = []
if not self.layered:
layers_to_process = [0]
else:
layers_to_process = self.layer_storage.indexes()
for layer in layers_to_process:
if (
self.layer_storage[layer].factor is not None
and apply_multiplier
):
mult = self.layer_storage[layer].factor
elif self._data_type == DatumType.integer:
mult = 1
else:
mult = 1.0
if (
self.layer_storage[layer].data_storage_type
== DataStorageType.internal_array
):
if (
self.layer_storage[layer].internal_data is None
or len(self.layer_storage[layer].internal_data) > 0
and self.layer_storage[layer].internal_data[0] is None
):
if is_aux:
full_data = None
else:
return None
elif (
self.layer_storage.get_total_size() == 1
or not self.layered
or not self._has_layer_dim()
):
full_data = self.layer_storage[layer].internal_data * mult
else:
full_data[layer] = (
self.layer_storage[layer].internal_data * mult
)
elif (
self.layer_storage[layer].data_storage_type
== DataStorageType.internal_constant
):
if (
self.layer_storage.get_total_size() == 1
or not self.layered
or not self._has_layer_dim()
):
full_data = self._fill_const_layer(layer) * mult
else:
full_data[layer] = self._fill_const_layer(layer) * mult
else:
file_access = MFFileAccessArray(
self.data_dimensions.structure,
self.data_dimensions,
self._simulation_data,
self._data_path,
self._stress_period,
)
model_name = self.data_dimensions.package_dim.model_dim[
0
].model_name
read_file = self._simulation_data.mfpath.resolve_path(
self.layer_storage[layer].fname, ""
)
if self.layer_storage[layer].binary:
data_out = (
file_access.read_binary_data_from_file(
read_file,
self.get_data_dimensions(layer),
self.get_data_size(layer),
self._data_type,
self._model_or_sim.modeldiscrit,
False,
)[0]
* mult
)
else:
data_out = (
file_access.read_text_data_from_file(
self.get_data_size(layer),
np_data_type,
self.get_data_dimensions(layer),
layer,
read_file,
)[0]
* mult
)
if (
self.layer_storage.get_total_size() == 1
or not self.layered
):
full_data = data_out
else:
if is_aux and full_data.shape == data_out.shape:
full_data = data_out
else:
full_data[layer] = data_out
if is_aux:
if full_data is not None:
all_none = False
aux_data.append(full_data)
full_data = np.full(
dimensions,
np.nan,
self.data_dimensions.structure.get_datum_type(True),
)
if is_aux:
if all_none:
return None
else:
return np.stack(aux_data, axis=0)
else:
return full_data
def _resolve_layer(self, layer):
if layer is None:
return self.layer_storage.first_index()
else:
return layer
def _to_ndarray(self, data, layer):
data_dimensions = self.get_data_dimensions(layer)
data_iter = MultiListIter(data)
return self._fill_dimensions(data_iter, data_dimensions)
def _fill_const_layer(self, layer):
data_dimensions = self.get_data_dimensions(layer)
if layer is None:
ls = self.layer_storage.first_item()
else:
ls = self.layer_storage[layer]
if data_dimensions[0] < 0:
return ls.data_const_value[0]
else:
data_type = self.data_dimensions.structure.get_datum_type(
numpy_type=True
)
return np.full(data_dimensions, ls.data_const_value[0], data_type)
def _is_type(self, data_item, data_type):
if data_type == DatumType.string or data_type == DatumType.keyword:
return True
elif data_type == DatumType.integer:
return DatumUtil.is_int(data_item)
elif data_type == DatumType.double_precision:
return DatumUtil.is_float(data_item)
elif data_type == DatumType.keystring:
# TODO: support keystring type
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print("Keystring type currently not supported.")
return True
else:
if (
self._simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(f"{data_type} type checking currently not supported")
return True
def _fill_dimensions(self, data_iter, dimensions):
if self.data_structure_type == DataStructureType.ndarray:
np_dtype = MFFileAccess.datum_to_numpy_type(self._data_type)[0]
# initialize array
data_array = np.ndarray(shape=dimensions, dtype=np_dtype)
# fill array
for index in ArrayIndexIter(dimensions):
data_array.itemset(index, next(data_iter))
return data_array
elif self.data_structure_type == DataStructureType.scalar:
return next(data_iter)
else:
data_array = None
data_line = ()
# fill array
array_index_iter = ArrayIndexIter(dimensions)
current_col = 0
for index in array_index_iter:
data_line += (index,)
if current_col == dimensions[1] - 1:
try:
if data_array is None:
data_array = np.rec.array(
data_line, self._recarray_type_list
)
else:
rec_array = np.rec.array(
data_line, self._recarray_type_list
)
data_array = np.hstack((data_array, rec_array))
except:
message = (
"An error occurred when storing data "
'"{}" in a recarray. Data line being '
"stored: {}".format(
self.data_dimensions.structure.name, data_line
)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"processing open/close line",
dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
current_col = 0
data_line = ()
data_array[index] = next(data_iter)
return data_array
[docs] def set_tas(self, tas_name, tas_label, current_key, check_name=True):
if check_name:
package_dim = self.data_dimensions.package_dim
tas_names = package_dim.get_tasnames()
if (
tas_name.lower() not in tas_names
and self._simulation_data.verbosity_level.value
>= VerbosityLevel.normal.value
):
print(
"WARNING: Time array series name {} not found in any "
"time series file".format(tas_name)
)
# this is a time series array with a valid tas variable
self.data_structure_type = DataStructureType.scalar
try:
self.set_data(f"{tas_label} {tas_name}", 0, key=current_key)
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
structure = self.data_dimensions.structure
raise MFDataException(
structure.get_model(),
structure.get_package(),
structure.path,
"storing data",
structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
None,
self._simulation_data.debug,
ex,
)
[docs] def resolve_data_size(self, index):
# Resolves the size of a given data element based on the names in the
# existing rec_array. Assumes repeating data element names follow the
# format <data_element_name>_X
if self.data_structure_type != DataStructureType.recarray:
message = (
"Data structure type is {}. Data structure type must "
"be recarray.".format(self.data_structure_type)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"resolving data size",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if len(self.layer_storage.first_item().internal_data[0]) <= index:
return 0
label = self.layer_storage.first_item().internal_data.dtype.names[
index
]
label_list = label.split("_")
if len(label_list) == 1:
return 1
internal_data = self.layer_storage.first_item().internal_data
for forward_index in range(index + 1, len(internal_data.dtype.names)):
forward_label = internal_data.dtype.names[forward_index]
forward_label_list = forward_label.split("_")
if forward_label_list[0] != label_list[0]:
return forward_index - index
return len(internal_data.dtype.names) - index
[docs] def build_type_list(
self,
data_set=None,
data=None,
resolve_data_shape=True,
key=None,
nseg=None,
cellid_expanded=False,
min_size=False,
overwrite_existing_type_list=True,
):
if not overwrite_existing_type_list:
existing_type_list = self._recarray_type_list
existing_type_list_ex = self._recarray_type_list_ex
if data_set is None:
self.jagged_record = False
self._recarray_type_list = []
self._recarray_type_list_ex = []
self.recarray_cellid_list = []
self.recarray_cellid_list_ex = []
data_set = self.data_dimensions.structure
initial_keyword = True
package_dim = self.data_dimensions.package_dim
for data_item, index in zip(
data_set.data_item_structures,
range(0, len(data_set.data_item_structures)),
):
# handle optional mnames
if (
not data_item.optional
or len(data_item.name) < 5
or data_item.name.lower()[0:5] != "mname"
or not self.in_model
):
overrides = self._data_type_overrides
if len(self._recarray_type_list) in overrides:
data_type = overrides[len(self._recarray_type_list)]
elif isinstance(data_item, MFDataItemStructure):
data_type = data_item.get_rec_type()
else:
data_type = None
if data_item.name.lower() == "aux" and resolve_data_shape:
aux_var_names = package_dim.get_aux_variables()
if aux_var_names is not None:
for aux_var_name in aux_var_names[0]:
if aux_var_name.lower() != "auxiliary":
self._append_type_lists(
aux_var_name, data_type, False
)
elif data_item.type == DatumType.record:
# record within a record, recurse
self.build_type_list(data_item, True, data)
elif data_item.type == DatumType.keystring:
self.jagged_record = True
self._append_type_lists(
data_item.name, data_type, data_item.is_cellid
)
# add potential data after keystring to type list
ks_data_item = deepcopy(data_item)
ks_data_item.type = DatumType.string
ks_data_item.name = f"{ks_data_item.name}_data"
ks_rec_type = ks_data_item.get_rec_type()
if not min_size:
self._append_type_lists(
ks_data_item.name,
ks_rec_type,
ks_data_item.is_cellid,
)
if (
index == len(data_set.data_item_structures) - 1
and data is not None
):
idx = 1
(
line_max_size,
line_min_size,
) = self._get_max_min_data_line_size(data)
if min_size:
line_size = line_min_size
else:
line_size = line_max_size
type_list = self.resolve_typelist(data)
while len(type_list) < line_size:
# keystrings at the end of a line can contain
# items of variable length. assume everything at
# the end of the data line is related to the last
# keystring
name = f"{ks_data_item.name}_{idx}"
self._append_type_lists(
name, ks_rec_type, ks_data_item.is_cellid
)
idx += 1
elif (
data_item.name != "boundname"
or self.data_dimensions.package_dim.boundnames()
):
# don't include initial keywords
if (
data_item.type != DatumType.keyword
or data_set.block_variable
):
initial_keyword = False
shape_rule = None
if data_item.tagged:
if (
data_item.type != DatumType.string
and data_item.type != DatumType.keyword
):
name = f"{data_item.name}_label"
self._append_type_lists(
name, object, data_item.is_cellid
)
if (
nseg is not None
and len(data_item.shape) > 0
and isinstance(data_item.shape[0], str)
and data_item.shape[0][0:4] == "nseg"
):
# nseg explicitly specified. resolve any formula
# nseg is in
model_dim = self.data_dimensions.get_model_dim(
None
)
exp_array = model_dim.build_shape_expression(
data_item.shape
)
if (
isinstance(exp_array, list)
and len(exp_array) == 1
):
exp = exp_array[0]
resolved_shape = [
model_dim.resolve_exp(exp, nseg)
]
else:
resolved_shape = [1]
else:
if resolve_data_shape:
data_dim = self.data_dimensions
(
resolved_shape,
shape_rule,
) = data_dim.get_data_shape(
data_item,
data_set,
data,
repeating_key=key,
min_size=min_size,
)
else:
resolved_shape = [1]
if (
not resolved_shape
or len(resolved_shape) == 0
or resolved_shape[0] == -1
):
# could not resolve shape
resolved_shape = [1]
elif (
resolved_shape[0] == -9999
or shape_rule is not None
):
if data is not None and not min_size:
# shape is an indeterminate 1-d array and
# should consume the remainder of the data
max_s = PyListUtil.max_multi_dim_list_size(
data
)
resolved_shape[0] = max_s - len(
self._recarray_type_list
)
else:
# shape is indeterminate 1-d array and either
# no data provided to resolve or request is
# for minimum data size
resolved_shape[0] = 1
if not min_size:
self.jagged_record = True
if data_item.is_cellid:
if (
data_item.shape is not None
and len(data_item.shape) > 0
and data_item.shape[0] == "ncelldim"
):
# A cellid is a single entry (tuple) in the
# recarray. Adjust dimensions accordingly.
data_dim = self.data_dimensions
grid = data_dim.get_model_grid()
size = grid.get_num_spatial_coordinates()
data_item.remove_cellid(resolved_shape, size)
if not data_item.optional or not min_size:
for index in range(0, resolved_shape[0]):
if resolved_shape[0] > 1:
name = f"{data_item.name}_{index}"
else:
name = data_item.name
self._append_type_lists(
name, data_type, data_item.is_cellid
)
if cellid_expanded:
new_type_list_ex = self._recarray_type_list_ex
if not overwrite_existing_type_list:
self._recarray_type_list = existing_type_list
self._recarray_type_list_ex = existing_type_list_ex
return new_type_list_ex
else:
new_type_list = self._recarray_type_list
if not overwrite_existing_type_list:
self._recarray_type_list = existing_type_list
self._recarray_type_list_ex = existing_type_list_ex
return new_type_list
[docs] def get_default_mult(self):
if self._data_type == DatumType.integer:
return 1
else:
return 1.0
def _append_type_lists(self, name, data_type, iscellid):
# add entry(s) to type lists
self._recarray_type_list.append((name, data_type))
self.recarray_cellid_list.append(iscellid)
if iscellid and self._model_or_sim.model_type is not None:
# write each part of the cellid out as a separate entry
# to _recarray_list_list_ex
model_grid = self.data_dimensions.get_model_grid()
cellid_size = model_grid.get_num_spatial_coordinates()
# determine header for different grid types
if cellid_size == 1:
self._do_ex_list_append(name, int, iscellid)
elif cellid_size == 2:
self._do_ex_list_append("layer", int, iscellid)
self._do_ex_list_append("cell2d_num", int, iscellid)
else:
self._do_ex_list_append("layer", int, iscellid)
self._do_ex_list_append("row", int, iscellid)
self._do_ex_list_append("column", int, iscellid)
else:
self._do_ex_list_append(name, data_type, iscellid)
return iscellid
def _do_ex_list_append(self, name, data_type, iscellid):
self._recarray_type_list_ex.append((name, data_type))
self.recarray_cellid_list_ex.append(iscellid)
@staticmethod
def _calc_data_size(data, count_to=None, current_length=None):
if current_length is None:
current_length = [0]
if isinstance(data, np.ndarray):
current_length[0] += data.size
return data.size
if isinstance(data, str) or isinstance(data, dict):
return 1
try:
for data_item in data:
if hasattr(data_item, "__len__"):
DataStorage._calc_data_size(
data_item, count_to, current_length
)
else:
current_length[0] += 1
if count_to is not None and current_length[0] >= count_to:
return current_length[0]
except (ValueError, IndexError, TypeError):
return 1
return current_length[0]
@staticmethod
def _get_max_min_data_line_size(data):
max_size = 0
min_size = sys.maxsize
if data is not None:
for value in data:
if len(value) > max_size:
max_size = len(value)
if len(value) < min_size:
min_size = len(value)
if min_size == sys.maxsize:
min_size = 0
return max_size, min_size
[docs] def get_data_dimensions(self, layer):
data_dimensions = self.data_dimensions.get_data_shape()[0]
if (
layer is not None
and self.layer_storage.get_total_size() > 1
and self._has_layer_dim()
):
# remove all "layer" dimensions from the list
layer_dims = self.data_dimensions.structure.data_item_structures[
0
].layer_dims
data_dimensions = data_dimensions[len(layer_dims) :]
return data_dimensions
def _has_layer_dim(self):
return (
"nlay" in self.data_dimensions.structure.shape
or "nodes" in self.data_dimensions.structure.shape
)
def _store_prep(self, layer, multiplier):
if not (layer is None or self.layer_storage.in_shape(layer)):
message = f"Layer {layer} is not a valid layer."
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.data_dimensions.structure.get_model(),
self.data_dimensions.structure.get_package(),
self.data_dimensions.structure.path,
"storing data",
self.data_dimensions.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if layer is None:
# layer is none means the data provided is for all layers or this
# is not layered data
layer = (0,)
self.layer_storage.list_shape = (1,)
self.layer_storage.multi_dim_list = [
self.layer_storage.first_item()
]
mult_ml = MultiList(multiplier)
if not mult_ml.in_shape(layer):
if multiplier[0] is None:
multiplier = self.get_default_mult()
else:
multiplier = multiplier[0]
else:
if mult_ml.first_item() is None:
multiplier = self.get_default_mult()
else:
multiplier = mult_ml.first_item()
return layer, multiplier
[docs] def get_data_size(self, layer):
dimensions = self.get_data_dimensions(layer)
data_size = 1
for dimension in dimensions:
data_size = data_size * dimension
return data_size