"""
modeldimensions module. Contains the model dimension information
"""
import sys
from ...utils.datautil import DatumUtil, NameIter
from ..data.mfstructure import DatumType
from ..mfbase import FlopyException, StructException, VerbosityLevel
from ..utils.mfenums import DiscretizationType
from .modelgrid import ModelGrid, UnstructuredModelGrid
from .simulationtime import SimulationTime
[docs]class DataDimensions:
"""
Resolves dimension information for model data using information contained
in the model files
Parameters
----------
package_dim : PackageDimensions
PackageDimension object for the package that the data is contained in
structure : MFDataStructure
MFDataStructure object of data whose dimensions need to be resolved
(optional)
Methods
----------
get_model_grid : ()
returns a model grid based on the current simulation data
def get_data_shape(data_item : MFDataItemStructure, data_set_struct :
MFDataStructure, data_item_num : int):
returns the shape of modflow data structure. returns shape of entire
data structure if no data item is specified, otherwise returns shape of
individual data time. user data and the dictionary path to the data
can be passed in "data" to help resolve the data shape
model_subspace_size : (subspace_string : str)
returns the size of the model subspace specified in subspace_string
See Also
--------
Notes
-----
Examples
--------
"""
def __init__(self, package_dim, structure):
self.package_dim = package_dim
self.structure = structure
self.model_grid = None
self.locked = False
[docs] def lock(self):
self.model_grid = None
self.locked = True
self.package_dim.lock()
[docs] def unlock(self):
self.locked = False
self.package_dim.unlock()
[docs] def get_model_grid(self, data_item_num=None, model_num=None):
if self.locked:
if self.model_grid is None or model_num is not None:
self.model_grid = self.get_model_dim(
data_item_num, model_num
).get_model_grid()
return self.model_grid
else:
return self.get_model_dim(
data_item_num, model_num
).get_model_grid()
[docs] def get_data_shape(
self,
data_item=None,
data_set_struct=None,
data=None,
data_item_num=None,
repeating_key=None,
min_size=False,
):
return self.get_model_dim(data_item_num).get_data_shape(
self.structure,
data_item,
data_set_struct,
data,
self.package_dim.package_path,
repeating_key=repeating_key,
min_size=min_size,
)
[docs] def model_subspace_size(self, subspace_string="", data_item_num=None):
return self.get_model_dim(data_item_num).model_subspace_size(
subspace_string
)
[docs] def get_model_dim(self, data_item_num, model_num=None):
if (
self.package_dim.model_dim is None
or (data_item_num is None and model_num is None)
or len(self.package_dim.model_dim) == 1
):
return self.package_dim.model_dim[0]
else:
if model_num is None:
model_num = self.structure.data_item_structures[data_item_num][
-1
]
if not (
len(self.structure.data_item_structures) > data_item_num
):
raise FlopyException(
'Data item index "{}" requested which '
"is greater than the maximum index of"
"{}.".format(
data_item_num,
len(self.structure.data_item_structures) - 1,
)
)
else:
if not len(self.package_dim.model_dim) > model_num:
raise FlopyException(
f'Model item index "{model_num}" requested which '
"is greater than the maximum index of"
f"{len(self.package_dim.model_dim)}."
)
if DatumUtil.is_int(model_num):
return self.package_dim.model_dim[int(model_num)]
[docs]class PackageDimensions:
"""
Resolves dimension information for common parts of a package
Parameters
----------
model_dim : ModelDimensions
ModelDimensions object for the model that the package is contained in
structure : MFPackageStructure
MFPackageStructure object of package
package_path : tuple
Tuple representing the path to this package
Methods
----------
get_aux_variables : (model_num=0)
returns the package's aux variables
boundnames : (model_num=0)
returns true of the boundnames option is in the package
get_tasnames : (model_num=0)
returns a dictionary of all the tas names used in a tas file
get_tsnames : (model_num=0)
returns a dictionary of all the ts names used in a ts file
See Also
--------
Notes
-----
Examples
--------
"""
def __init__(self, model_dim, structure, package_path):
self.model_dim = model_dim
self.package_struct = structure
self.package_path = package_path
self.locked = False
self.ts_names_dict = {}
self.tas_names_dict = {}
self.aux_variables = {}
self.boundnames_dict = {}
[docs] def lock(self):
self.locked = True
for model_dim in self.model_dim:
model_dim.lock()
[docs] def unlock(self):
self.locked = False
self.ts_names_dict = {}
self.tas_names_dict = {}
self.aux_variables = {}
self.boundnames_dict = {}
for model_dim in self.model_dim:
model_dim.unlock()
[docs] def get_aux_variables(self, model_num=0):
if self.locked and model_num in self.aux_variables:
return self.aux_variables[model_num]
aux_path = self.package_path + ("options", "auxiliary")
if aux_path in self.model_dim[model_num].simulation_data.mfdata:
ret_val = (
self.model_dim[model_num]
.simulation_data.mfdata[aux_path]
.get_data()
)
else:
ret_val = None
if self.locked:
self.aux_variables[model_num] = ret_val
return ret_val
[docs] def boundnames(self, model_num=0):
if self.locked and model_num in self.boundnames_dict:
return self.boundnames_dict[model_num]
ret_val = False
bound_path = self.package_path + ("options", "boundnames")
if bound_path in self.model_dim[model_num].simulation_data.mfdata:
if (
self.model_dim[model_num]
.simulation_data.mfdata[bound_path]
.get_data()
is not None
):
ret_val = True
if self.locked:
self.boundnames_dict[model_num] = ret_val
return ret_val
[docs] def get_tasnames(self, model_num=0):
if self.locked and model_num in self.tas_names_dict:
return self.tas_names_dict[model_num]
names_dict = {}
tas_record_path = self.package_path + ("options", "tas_filerecord")
if tas_record_path in self.model_dim[model_num].simulation_data.mfdata:
tas_record_data = (
self.model_dim[model_num]
.simulation_data.mfdata[tas_record_path]
.get_data()
)
if tas_record_data is not None:
name_iter = NameIter("tas")
for tas_name in name_iter:
tas_names_path = self.package_path + (
tas_name,
"attributes",
"time_series_namerecord",
)
if (
tas_names_path
in self.model_dim[model_num].simulation_data.mfdata
):
tas_names_data = (
self.model_dim[model_num]
.simulation_data.mfdata[tas_names_path]
.get_data()
)
if tas_names_data is not None:
names_dict[tas_names_data[0][0]] = 0
else:
break
if self.locked:
self.tas_names_dict[model_num] = names_dict
return names_dict
[docs] def get_tsnames(self, model_num=0):
if self.locked and model_num in self.ts_names_dict:
return self.ts_names_dict[model_num]
names_dict = {}
ts_record_path = self.package_path + ("options", "ts_filerecord")
if ts_record_path in self.model_dim[model_num].simulation_data.mfdata:
ts_record_data = (
self.model_dim[model_num]
.simulation_data.mfdata[ts_record_path]
.get_data()
)
if ts_record_data is not None:
name_iter = NameIter("ts")
for ts_name in name_iter:
ts_names_path = self.package_path + (
ts_name,
"attributes",
"time_series_namerecord",
)
if (
ts_names_path
in self.model_dim[model_num].simulation_data.mfdata
):
ts_names_data = (
self.model_dim[model_num]
.simulation_data.mfdata[ts_names_path]
.get_data()
)
if ts_names_data is not None:
for name in ts_names_data[0]:
names_dict[name] = 0
else:
break
if self.locked:
self.ts_names_dict[model_num] = names_dict
return names_dict
[docs]class ModelDimensions:
"""
Contains model dimension information and helper methods
Parameters
----------
model_name : str
name of the model
simulation_data : MFSimulationData
contains all simulation related data
structure : MFDataStructure
MFDataStructure object of data whose dimensions need to be resolved
(optional)
Attributes
----------
simulation_time : SimulationTime
object containing simulation time information
Methods
----------
get_model_grid : ()
returns a model grid based on the current simulation data
def get_data_shape(structure : MFDataStructure, data_item :
MFDataItemStructure, data_set_struct : MFDataStructure,
data : list, path : tuple, deconstruct_axis : bool):
returns the shape of modflow data structure. returns shape of entire
data structure if no data item is specified, otherwise returns shape of
individual data time. user data and the dictionary path to the data
can be passed in "data" to help resolve the data shape. if
deconstruct_axis is True any spatial axis will be automatically
deconstructed into its component parts (model grid will be
deconstructed into layer/row/col)
data_reshape : ()
reshapes jagged model data
model_subspace_size : (subspace_string : str)
returns the size of the model subspace specified in subspace_string
See Also
--------
Notes
-----
Examples
--------
"""
def __init__(self, model_name, simulation_data):
self.model_name = model_name
self.simulation_data = simulation_data
self._model_grid = None
self.simulation_time = SimulationTime(simulation_data)
self.locked = False
self.stored_shapes = {}
[docs] def lock(self):
self.locked = True
[docs] def unlock(self):
self.locked = False
self.stored_shapes = {}
# returns model grid
[docs] def get_model_grid(self):
if not self.locked or self._model_grid is None:
grid_type = ModelGrid.get_grid_type(
self.simulation_data, self.model_name
)
if not self._model_grid:
self._create_model_grid(grid_type)
else:
# if existing model grid is consistent with model data
if not self._model_grid.grid_type_consistent():
# create new model grid and return
self._create_model_grid(grid_type)
print(
"WARNING: Model grid type has changed. get_model_grid() "
"is returning a new model grid object of the appropriate "
"type. References to the old model grid object are "
"invalid."
)
self._model_grid.freeze_grid = True
return self._model_grid
def _create_model_grid(self, grid_type):
if grid_type == DiscretizationType.DIS:
self._model_grid = ModelGrid(
self.model_name, self.simulation_data, DiscretizationType.DIS
)
elif grid_type == DiscretizationType.DISV:
self._model_grid = ModelGrid(
self.model_name, self.simulation_data, DiscretizationType.DISV
)
elif grid_type == DiscretizationType.DISU:
self._model_grid = UnstructuredModelGrid(
self.model_name, self.simulation_data
)
elif grid_type == DiscretizationType.DISV1D:
self._model_grid = ModelGrid(
self.model_name,
self.simulation_data,
DiscretizationType.DISV1D,
)
elif grid_type == DiscretizationType.DIS2D:
self._model_grid = ModelGrid(
self.model_name, self.simulation_data, DiscretizationType.DIS2D
)
elif grid_type == DiscretizationType.DISV2D:
self._model_grid = ModelGrid(
self.model_name,
self.simulation_data,
DiscretizationType.DISV2D,
)
else:
self._model_grid = ModelGrid(
self.model_name,
self.simulation_data,
DiscretizationType.UNDEFINED,
)
# Returns a shape for a given set of axes
[docs] def get_data_shape(
self,
structure,
data_item=None,
data_set_struct=None,
data=None,
path=None,
deconstruct_axis=True,
repeating_key=None,
min_size=False,
):
if structure is None:
raise FlopyException(
"get_data_shape requires a valid structure object"
)
if self.locked:
if data_item is not None and data_item.path in self.stored_shapes:
return (
self.stored_shapes[data_item.path][0],
self.stored_shapes[data_item.path][1],
)
if structure.path in self.stored_shapes:
return (
self.stored_shapes[structure.path][0],
self.stored_shapes[structure.path][1],
)
shape_dimensions = []
shape_rule = None
shape_consistent = True
if data_item is None:
if (
structure.type == DatumType.recarray
or structure.type == DatumType.record
):
if structure.type == DatumType.record:
num_rows = 1
else:
num_rows, consistent_shape = self._resolve_data_item_shape(
structure
)[0]
shape_consistent = shape_consistent and consistent_shape
num_cols = 0
for data_item_struct in structure.data_item_structures:
if data_item_struct.type != DatumType.keyword:
(
num,
shape_rule,
consistent_shape,
) = self._resolve_data_item_shape(
data_item_struct,
path=path,
repeating_key=repeating_key,
)[0]
num_cols = num_cols + num
shape_consistent = (
shape_consistent and consistent_shape
)
shape_dimensions = [num_rows, num_cols]
else:
for data_item_struct in structure.data_item_structures:
if len(shape_dimensions) == 0:
(
shape_dimensions,
shape_rule,
consistent_shape,
) = self._resolve_data_item_shape(
data_item_struct, repeating_key=repeating_key
)
else:
(
dim,
shape_rule,
consistent_shape,
) = self._resolve_data_item_shape(
data_item_struct, repeating_key=repeating_key
)
shape_dimensions += dim
shape_consistent = shape_consistent and consistent_shape
if self.locked and shape_consistent:
self.stored_shapes[structure.path] = (
shape_dimensions,
shape_rule,
)
else:
(
shape_dimensions,
shape_rule,
consistent_shape,
) = self._resolve_data_item_shape(
data_item,
data_set_struct,
data,
path,
deconstruct_axis,
repeating_key=repeating_key,
min_size=min_size,
)
if self.locked and consistent_shape:
self.stored_shapes[data_item.path] = (
shape_dimensions,
shape_rule,
)
return shape_dimensions, shape_rule
def _resolve_data_item_shape(
self,
data_item_struct,
data_set_struct=None,
data=None,
path=None,
deconstruct_axis=True,
repeating_key=None,
min_size=False,
):
if isinstance(data, tuple):
data = [data]
shape_rule = None
consistent_shape = True
if path is None:
parent_path = data_item_struct.path[:-2]
else:
parent_path = path
shape_dimensions = []
if len(data_item_struct.shape) > 0:
shape = data_item_struct.shape[:]
# resolve approximate shapes
for index, shape_item in enumerate(shape):
if shape_item[0] == "<" or shape_item[0] == ">":
shape_rule = shape_item[0]
shape[index] = shape_item[1:]
if deconstruct_axis:
shape = self.deconstruct_axis(shape)
ordered_shape = self._order_shape(shape, data_item_struct)
ordered_shape_expression = self.build_shape_expression(
ordered_shape
)
for item in ordered_shape_expression:
dim_size = self.dimension_size(item[0])
if dim_size is not None:
if isinstance(dim_size, list):
shape_dimensions += dim_size
else:
shape_dimensions.append(
self.resolve_exp(item, dim_size)
)
elif item[0].lower() == "nstp" and DatumUtil.is_int(
repeating_key
):
# repeating_key is a stress period. get number of time
# steps for that stress period
shape_dimensions.append(
self.simulation_time.get_sp_time_steps(
int(repeating_key)
)
)
else:
result = None
if data_set_struct is not None:
# try to resolve dimension in the existing data
# set first
result = self.resolve_exp(
item,
self._find_in_dataset(
data_set_struct, item[0], data, min_size
),
)
if result:
consistent_shape = False
if result:
shape_dimensions.append(result)
else:
if (
item[0] == "any1d"
or item[0] == "naux"
or item[0] == "nconrno"
or item[0] == "unknown"
or item[0] == ":"
):
consistent_shape = False
shape_dimensions.append(-9999)
elif item[0] == "any2d":
consistent_shape = False
shape_dimensions.append(-9999)
shape_dimensions.append(-9999)
elif DatumUtil.is_int(item[0]):
shape_dimensions.append(int(item[0]))
else:
# try to resolve dimension within the
# existing block
result = self.simulation_data.mfdata.find_in_path(
parent_path, item[0]
)
if result[0] is not None:
data = result[0].get_data()
if data is None:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
"INFORMATION: Unable to resolve "
"dimension of {} based on shape "
'"{}".'.format(
data_item_struct.path, item[0]
)
)
shape_dimensions.append(-9999)
consistent_shape = False
elif result[1] is not None:
# if int return first value otherwise
# return shape of data stored
if DatumUtil.is_int(data[result[1]]):
shape_dimensions.append(
self.resolve_exp(item, int(data))
)
else:
shape_dimensions.append(
self.resolve_exp(
item, len(data[result[1]])
)
)
else:
if DatumUtil.is_int(data):
shape_dimensions.append(
self.resolve_exp(item, int(data))
)
else:
shape_dimensions.append(
self.resolve_exp(item, len(data))
)
else:
if (
self.simulation_data.verbosity_level.value
>= VerbosityLevel.verbose.value
):
print(
"INFORMATION: Unable to resolve "
"dimension of {} based on shape "
'"{}".'.format(
data_item_struct.path, item[0]
)
)
shape_dimensions.append(-9999)
consistent_shape = False
else:
if (
data_item_struct.type == DatumType.recarray
or data_item_struct.type == DatumType.record
):
# shape is unknown
shape_dimensions.append(-9999)
consistent_shape = False
else:
# shape is assumed to be that of single entry
shape_dimensions.append(1)
return shape_dimensions, shape_rule, consistent_shape
[docs] def resolve_exp(self, expression, value):
if len(expression) == 3 and value is not None:
if not DatumUtil.is_int(expression[1]):
# try to resolve the 2nd term in the equation
expression[1] = self.dimension_size(expression[1])
if expression[1] is None:
except_str = (
'Expression "{}" contains an invalid '
"second term and can not be "
"resolved.".format(expression)
)
raise StructException(except_str, "")
if expression[2] == "+":
return value + int(expression[1])
elif expression[2] == "-":
return value - int(expression[1])
elif expression[2] == "*":
return value * int(expression[1])
elif expression[2] == "/":
return value / int(expression[1])
else:
except_str = (
'Expression "{}" contains an invalid operator '
"and can not be resolved.".format(expression)
)
raise StructException(except_str, "")
else:
return value
@staticmethod
def _find_in_dataset(data_set_struct, item, data, min_size=False):
if data is not None:
# find the current data item in data_set_struct
for index, data_item in zip(
range(0, len(data_set_struct.data_item_structures)),
data_set_struct.data_item_structures,
):
if (
data_item.name.lower() == item.lower()
and len(data[0]) > index
):
if min_size:
# use the minimum value
min_val = sys.maxsize
for data_line in data:
if data_line[index] < min_val:
min_val = data_line[index]
if min_val == sys.maxsize:
return 0
return min_val
else:
# use the maximum value
max_val = 0
for data_line in data:
if data_line[index] > max_val:
max_val = data_line[index]
return max_val
return None
[docs] @staticmethod
def build_shape_expression(shape_array):
new_expression_array = []
for entry in shape_array:
entry_minus = entry.split("-")
if len(entry_minus) > 1:
entry_minus.append("-")
new_expression_array.append(entry_minus)
else:
entry_plus = entry.split("+")
if len(entry_plus) > 1:
entry_plus.append("+")
new_expression_array.append(entry_plus)
else:
entry_mult = entry.split("*")
if len(entry_mult) > 1:
entry_mult.append("*")
new_expression_array.append(entry_mult)
else:
entry_div = entry.split("*")
if len(entry_div) > 1:
entry_div.append("/")
new_expression_array.append(entry_div)
else:
new_expression_array.append([entry])
return new_expression_array
def _order_shape(self, shape_array, data_item_struct):
new_shape_array = []
for entry in shape_array:
if entry in data_item_struct.layer_dims:
# "layer" dimensions get ordered first
new_shape_array.append(entry)
order = ["nlay", "nrow", "ncol"]
for order_item in order:
if order_item not in data_item_struct.layer_dims:
for entry in shape_array:
if entry == order_item:
new_shape_array.append(entry)
for entry in shape_array:
if entry not in order and entry not in data_item_struct.layer_dims:
new_shape_array.append(entry)
return new_shape_array
[docs] def model_subspace_size(self, subspace_string):
axis_found = False
subspace_size = 1
for axis in subspace_string:
dim_size = self.dimension_size(axis, False)
if dim_size is not None:
subspace_size = subspace_size * dim_size
axis_found = True
if axis_found:
return subspace_size
else:
return -1
[docs] def dimension_size(self, dimension_string, return_shape=True):
if dimension_string == "nrow":
return self.get_model_grid().num_rows()
elif dimension_string == "ncol":
return self.get_model_grid().num_columns()
elif dimension_string == "nlay":
return self.get_model_grid().num_layers()
elif dimension_string == "ncpl":
return self.get_model_grid().num_cells_per_layer()
elif dimension_string == "nodes":
if return_shape:
return self.get_model_grid().get_model_dim()
else:
return self.get_model_grid().num_cells()
elif dimension_string == "nja":
return self.get_model_grid().num_connections()
elif dimension_string == "ncelldim":
return self.get_model_grid().get_num_spatial_coordinates()
else:
return None
[docs] def deconstruct_axis(self, shape_array):
deconstructed_shape_array = []
for entry in shape_array:
if entry == "ncpl":
if self.get_model_grid().grid_type() == DiscretizationType.DIS:
deconstructed_shape_array.append("ncol")
deconstructed_shape_array.append("nrow")
else:
deconstructed_shape_array.append(entry)
elif entry == "nodes":
if self.get_model_grid().grid_type() == DiscretizationType.DIS:
deconstructed_shape_array.append("ncol")
deconstructed_shape_array.append("nrow")
deconstructed_shape_array.append("nlay")
elif (
self.get_model_grid().grid_type()
== DiscretizationType.DISV
):
deconstructed_shape_array.append("ncpl")
deconstructed_shape_array.append("nlay")
else:
deconstructed_shape_array.append(entry)
else:
deconstructed_shape_array.append(entry)
return deconstructed_shape_array