import inspect
import sys
import numpy as np
from ...datbase import DataType
from ...utils.datautil import clean_filename
from ..data import mfdata
from ..data.mfstructure import DatumType
from ..mfbase import ExtFileAction, MFDataException
from .mfdatastorage import DataStorage, DataStorageType, DataStructureType
from .mfdatautil import convert_data, to_string
from .mffileaccess import MFFileAccessScalar
[docs]class MFScalar(mfdata.MFData):
"""
Provides an interface for the user to access and update MODFLOW
scalar data. MFScalar objects are not designed to be directly constructed
by the end user. When a flopy for MODFLOW 6 package object is constructed,
the appropriate MFScalar objects are automatically built.
Parameters
----------
sim_data : MFSimulationData
data contained in the simulation
structure : MFDataStructure
describes the structure of the data
data : list or ndarray
actual data
enable : bool
enable/disable the array
path : tuple
path in the data dictionary to this MFArray
dimensions : MFDataDimensions
dimension information related to the model, package, and array
"""
def __init__(
self,
sim_data,
model_or_sim,
structure,
data=None,
enable=True,
path=None,
dimensions=None,
):
super().__init__(
sim_data, model_or_sim, structure, enable, path, dimensions
)
self._data_type = self.structure.data_item_structures[0].type
self._data_storage = self._new_storage()
if data is not None:
self.set_data(data)
@property
def data_type(self):
"""Type of data (DataType) stored in the scalar"""
return DataType.scalar
@property
def plottable(self):
"""If the scalar is plottable. Currently all scalars are not
plottable.
"""
return False
@property
def dtype(self):
"""The scalar's numpy data type (numpy.dtype)."""
if self.structure.type == DatumType.double_precision:
return np.float64
elif self.structure.type == DatumType.integer:
return np.int32
elif (
self.structure.type == DatumType.recarray
or self.structure.type == DatumType.record
or self.structure.type == DatumType.repeating_record
):
for data_item_struct in self.structure.data_item_structures:
if data_item_struct.type == DatumType.double_precision:
return np.float64
elif data_item_struct.type == DatumType.integer:
return np.int32
return None
[docs] def has_data(self, key=None):
"""Returns whether this object has data associated with it."""
try:
return self._get_storage_obj().has_data()
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"checking for data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
None,
self._simulation_data.debug,
ex,
)
@property
def data(self):
"""Returns the scalar data. Calls get_data with default parameters."""
return self.get_data()
[docs] def get_data(self, apply_mult=False, **kwargs):
"""Returns the data associated with this object.
Parameters
----------
apply_mult : bool
Parameter does not apply to scalar data.
Returns
-------
data : str, int, float, recarray
"""
try:
return self._get_storage_obj().get_data(apply_mult=apply_mult)
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"getting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
None,
self._simulation_data.debug,
ex,
)
[docs] def set_data(self, data):
"""Sets the contents of the data to `data`.
Parameters
----------
data : str/int/float/recarray/list
Data to set
"""
self._resync()
if self.structure.type == DatumType.record:
if data is not None:
if (
not isinstance(data, list)
or isinstance(data, np.ndarray)
or isinstance(data, tuple)
):
if isinstance(data, str):
# convert string to list of entries
data = data.strip().split()
else:
data = [data]
else:
if isinstance(data, str):
if self.structure.file_data or self.structure.nam_file_data:
# clean up file name data
data = clean_filename(data)
else:
data = data.strip().split()[-1]
else:
while (
isinstance(data, list)
or isinstance(data, np.ndarray)
or isinstance(data, tuple)
):
data = data[0]
if (
isinstance(data, list) or isinstance(data, tuple)
) and len(data) > 1:
self._add_data_line_comment(data[1:], 0)
storage = self._get_storage_obj()
if data is None:
converted_data = data
else:
data_struct = self.structure.data_item_structures[0]
try:
converted_data = convert_data(
data, self.data_dimensions, self._data_type, data_struct
)
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
comment = (
f'Could not convert data "{data}" to type '
f'"{self._data_type}".'
)
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"converting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
comment,
self._simulation_data.debug,
ex,
)
try:
storage.set_data(converted_data, key=self._current_key)
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
comment = (
f'Could not set data "{data}" to type "{self._data_type}".'
)
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"setting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
comment,
self._simulation_data.debug,
ex,
)
[docs] def add_one(self):
"""Adds one if this is an integer scalar"""
datum_type = self.structure.get_datum_type()
if datum_type == int or datum_type == np.int32:
if self._get_storage_obj().get_data() is None:
try:
self._get_storage_obj().set_data(1)
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
comment = "Could not set data to 1"
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"setting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
comment,
self._simulation_data.debug,
ex,
)
else:
try:
current_val = self._get_storage_obj().get_data()
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"getting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
None,
self._simulation_data.debug,
ex,
)
try:
self._get_storage_obj().set_data(current_val + 1)
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
comment = f'Could increment data "{current_val}" by one.'
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"setting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
comment,
self._simulation_data.debug,
ex,
)
else:
message = (
"{} of type {} does not support add one " "operation.".format(
self._data_name, self.structure.get_datum_type()
)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"adding one to scalar",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
[docs] def get_file_entry(
self,
values_only=False,
one_based=False,
ext_file_action=ExtFileAction.copy_relative_paths,
):
"""Returns a string containing the data formatted for a MODFLOW 6
file.
Parameters
----------
values_only : bool
Return values only excluding keywords
one_based : bool
Return one-based integer values
ext_file_action : ExtFileAction
How to handle external paths.
Returns
-------
file entry : str
"""
storage = self._get_storage_obj()
try:
if storage is None or self._get_storage_obj().get_data() is None:
return ""
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"getting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
None,
self._simulation_data.debug,
ex,
)
if (
self.structure.type == DatumType.keyword
or self.structure.type == DatumType.record
):
try:
data = storage.get_data()
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"getting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
None,
self._simulation_data.debug,
ex,
)
if self.structure.type == DatumType.keyword:
if data is not None and data is not False:
# keyword appears alone
return "{}{}\n".format(
self._simulation_data.indent_string,
self.structure.name.upper(),
)
else:
return ""
elif self.structure.type == DatumType.record:
text_line = []
index = 0
for data_item in self.structure.data_item_structures:
if (
data_item.type == DatumType.keyword
and not data_item.optional
):
if isinstance(data, list) or isinstance(data, tuple):
if len(data) == 1 and (
isinstance(data[0], tuple)
or isinstance(data[0], list)
):
data = data[0]
if len(data) > index and (
data[index] is not None
and data[index] is not False
):
text_line.append(data_item.name.upper())
if (
isinstance(data[index], str)
and data_item.name.upper()
!= data[index].upper()
and data[index] != ""
):
# since the data does not match the keyword
# assume the keyword was excluded
index -= 1
else:
if data is not None and data is not False:
text_line.append(data_item.name.upper())
else:
if data is not None and data != "":
if isinstance(data, list) or isinstance(data, tuple):
if len(data) > index:
if (
data[index] is not None
and data[index] is not False
):
current_data = data[index]
else:
break
elif data_item.optional is True:
break
else:
message = (
"Missing expected data. Data "
"size is {}. Index {} not"
"found.".format(len(data), index)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"getting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
else:
current_data = data
if data_item.type == DatumType.keyword:
if (
current_data is not None
and current_data is not False
):
if (
isinstance(data[index], str)
and data[index] == "#"
):
# if data has been commented out,
# keep the comment
text_line.append(data[index])
text_line.append(data_item.name.upper())
else:
try:
text_line.append(
to_string(
current_data,
self._data_type,
self._simulation_data,
self.data_dimensions,
data_item=data_item,
)
)
except Exception as ex:
message = (
'Could not convert "{}" of type '
'"{}" to a string'
".".format(current_data, self._data_type)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"converting data to string",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
index += 1
text = self._simulation_data.indent_string.join(text_line)
return f"{self._simulation_data.indent_string}{text}\n"
else:
data_item = self.structure.data_item_structures[0]
try:
if one_based:
if self.structure.type != DatumType.integer:
message = (
'Data scalar "{}" can not be one_based '
"because it is not an integer"
".".format(self.structure.name)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"storing one based integer",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
data = self._get_storage_obj().get_data() + 1
else:
data = self._get_storage_obj().get_data()
except Exception as ex:
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"getting data",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
None,
self._simulation_data.debug,
)
try:
# data
values = to_string(
data,
self._data_type,
self._simulation_data,
self.data_dimensions,
data_item=data_item,
verify_data=self._simulation_data.verify_data,
)
except Exception as ex:
message = (
'Could not convert "{}" of type "{}" '
"to a string.".format(data, self._data_type)
)
type_, value_, traceback_ = sys.exc_info()
raise MFDataException(
self.structure.get_model(),
self.structure.get_package(),
self._path,
"converting data to string",
self.structure.name,
inspect.stack()[0][3],
type_,
value_,
traceback_,
message,
self._simulation_data.debug,
)
if values_only:
return f"{self._simulation_data.indent_string}{values}"
else:
# keyword + data
return "{}{}{}{}\n".format(
self._simulation_data.indent_string,
self.structure.name.upper(),
self._simulation_data.indent_string,
values,
)
[docs] def load(
self,
first_line,
file_handle,
block_header,
pre_data_comments=None,
external_file_info=None,
):
"""Loads data from first_line (the first line of data) and open file
file_handle which is pointing to the second line of data. Returns a
tuple with the first item indicating whether all data was read
and the second item being the last line of text read from the file.
This method was only designed for internal FloPy use and is not
recommended for end users.
Parameters
----------
first_line : str
A string containing the first line of data.
file_handle : file descriptor
A file handle for the data file which points to the second
line of data
block_header : MFBlockHeader
Block header object that contains block header information
for the block containing this data
pre_data_comments : MFComment
Comments immediately prior to the data
external_file_info : list
Contains information about storing files externally
Returns
-------
more data : bool,
next data line : str
"""
super().load(
first_line,
file_handle,
block_header,
pre_data_comments=None,
external_file_info=None,
)
self._resync()
file_access = MFFileAccessScalar(
self.structure,
self.data_dimensions,
self._simulation_data,
self._path,
self._current_key,
)
return file_access.load_from_package(
first_line,
file_handle,
self._get_storage_obj(),
self._data_type,
self._keyword,
pre_data_comments,
)
def _new_storage(self, stress_period=0):
return DataStorage(
self._simulation_data,
self._model_or_sim,
self.data_dimensions,
self.get_file_entry,
DataStorageType.internal_array,
DataStructureType.scalar,
stress_period=stress_period,
data_path=self._path,
)
def _get_storage_obj(self, first_record=False):
return self._data_storage
[docs] def plot(self, filename_base=None, file_extension=None, **kwargs):
"""
Helper method to plot scalar objects
Parameters:
scalar : flopy.mf6.data.mfscalar object
filename_base : str
Base file name that will be used to automatically generate file
names for output image files. Plots will be exported as image
files if file_name_base is not None. (default is None)
file_extension : str
Valid matplotlib.pyplot file extension for savefig(). Only used
if filename_base is not None. (default is 'png')
Returns:
axes: list matplotlib.axes object
"""
from ...plot.plotutil import PlotUtilities
if not self.plottable:
raise TypeError("Scalar values are not plottable")
axes = PlotUtilities._plot_scalar_helper(
self,
filename_base=filename_base,
file_extension=file_extension,
**kwargs,
)
return axes
[docs]class MFScalarTransient(MFScalar, mfdata.MFTransient):
"""
Provides an interface for the user to access and update MODFLOW transient
scalar data. Transient scalar data is used internally by FloPy and should
not be used directly by the end user.
Parameters
----------
sim_data : MFSimulationData
data contained in the simulation
structure : MFDataStructure
describes the structure of the data
data : list or ndarray
actual data
enable : bool
enable/disable the array
path : tuple
path in the data dictionary to this MFArray
dimensions : MFDataDimensions
dimension information related to the model, package, and array
"""
def __init__(
self,
sim_data,
model_or_sim,
structure,
enable=True,
path=None,
dimensions=None,
):
mfdata.MFTransient.__init__(self)
MFScalar.__init__(
self,
sim_data=sim_data,
model_or_sim=model_or_sim,
structure=structure,
enable=enable,
path=path,
dimensions=dimensions,
)
self.repeating = True
@property
def data_type(self):
"""Type of data (DataType) stored in the scalar"""
return DataType.transientscalar
@property
def plottable(self):
"""If the scalar is plottable"""
if self.model is None:
return False
else:
return True
[docs] def add_transient_key(self, key):
"""Adds a new transient time allowing data for that time to be stored
and retrieved using the key `key`. Method is used
internally by FloPy and is not intended to the end user.
Parameters
----------
key : int
Zero-based stress period to add
"""
super().add_transient_key(key)
if isinstance(key, int):
stress_period = key
else:
stress_period = 1
self._data_storage[key] = super()._new_storage(stress_period)
[docs] def add_one(self, key=0):
"""Adds one to the data stored at key `key`. Method is used
internally by FloPy and is not intended to the end user.
Parameters
----------
key : int
Zero-based stress period to add
"""
self._update_record_prep(key)
super().add_one()
[docs] def has_data(self, key=None):
if key is None:
for sto_key in self._data_storage.keys():
self.get_data_prep(sto_key)
if super().has_data():
return True
else:
self.get_data_prep(key)
return super().has_data()
[docs] def get_data(self, key=0, **kwargs):
"""Returns the data for stress period `key`.
Parameters
----------
key : int
Zero-based stress period to return data from.
Returns
-------
data : str/int/float/recarray
"""
self.get_data_prep(key)
return super().get_data()
[docs] def set_data(self, data, key=None):
"""Sets the contents of the data at time `key` to `data`.
Parameters
----------
data : str/int/float/recarray/list
Data being set. Data can be a dictionary with keys as
zero-based stress periods and values as the data. If data is
a string, integer, double, recarray, or list of tuples, it
will be assigned to the the stress period specified in `key`.
If any is set to None, that stress period of data will be
removed.
key : int
Zero based stress period to assign data too. Does not apply
if `data` is a dictionary.
"""
if data is None and key is None:
return
if isinstance(data, dict):
# each item in the dictionary is a list for one stress period
# the dictionary key is the stress period the list is for
for key, list_item in data.items():
self._set_data_prep(list_item, key)
super().set_data(list_item)
else:
self._set_data_prep(data, key)
super().set_data(data)
[docs] def get_file_entry(
self, key=None, ext_file_action=ExtFileAction.copy_relative_paths
):
"""Returns a string containing the data at time `key` formatted for a
MODFLOW 6 file.
Parameters
----------
key : int
Zero based stress period to return data from.
ext_file_action : ExtFileAction
How to handle external paths.
Returns
-------
file entry : str
"""
if key is None:
file_entry = []
for sto_key in self._data_storage.keys():
if self.has_data(sto_key):
self._get_file_entry_prep(sto_key)
text_entry = super().get_file_entry(
ext_file_action=ext_file_action
)
file_entry.append(text_entry)
if len(file_entry) > 1:
return "\n\n".join(file_entry)
elif file_entry == 1:
return file_entry[0]
else:
return ""
else:
self._get_file_entry_prep(key)
return super().get_file_entry(ext_file_action=ext_file_action)
[docs] def load(
self,
first_line,
file_handle,
block_header,
pre_data_comments=None,
external_file_info=None,
):
"""Loads data from first_line (the first line of data) and open file
file_handle which is pointing to the second line of data. Returns a
tuple with the first item indicating whether all data was read
and the second item being the last line of text read from the file.
Parameters
----------
first_line : str
A string containing the first line of data in this scalar.
file_handle : file descriptor
A file handle for the data file which points to the second
line of data for this array
block_header : MFBlockHeader
Block header object that contains block header information
for the block containing this data
pre_data_comments : MFComment
Comments immediately prior to the data
external_file_info : list
Contains information about storing files externally
"""
self._load_prep(block_header)
return super().load(
first_line, file_handle, pre_data_comments, external_file_info
)
def _new_storage(self, stress_period=0):
return {}
def _get_storage_obj(self, first_record=False):
if first_record and isinstance(self._data_storage, dict):
for value in self._data_storage.values():
return value
return None
if (
self._current_key is None
or self._current_key not in self._data_storage
):
return None
return self._data_storage[self._current_key]
[docs] def plot(
self,
filename_base=None,
file_extension=None,
kper=0,
fignum=None,
**kwargs,
):
"""
Plot transient scalar model data
Parameters
----------
transientscalar : flopy.mf6.data.mfdatascalar.MFScalarTransient object
filename_base : str
Base file name that will be used to automatically generate file
names for output image files. Plots will be exported as image
files if file_name_base is not None. (default is None)
file_extension : str
Valid matplotlib.pyplot file extension for savefig(). Only used
if filename_base is not None. (default is 'png')
**kwargs : dict
axes : list of matplotlib.pyplot.axis
List of matplotlib.pyplot.axis that will be used to plot
data for each layer. If axes=None axes will be generated.
(default is None)
pcolor : bool
Boolean used to determine if matplotlib.pyplot.pcolormesh
plot will be plotted. (default is True)
colorbar : bool
Boolean used to determine if a color bar will be added to
the matplotlib.pyplot.pcolormesh. Only used if pcolor=True.
(default is False)
inactive : bool
Boolean used to determine if a black overlay in inactive
cells in a layer will be displayed. (default is True)
contour : bool
Boolean used to determine if matplotlib.pyplot.contour
plot will be plotted. (default is False)
clabel : bool
Boolean used to determine if matplotlib.pyplot.clabel
will be plotted. Only used if contour=True. (default is False)
grid : bool
Boolean used to determine if the model grid will be plotted
on the figure. (default is False)
masked_values : list
List of unique values to be excluded from the plot.
kper : str
MODFLOW zero-based stress period number to return. If
kper='all' then data for all stress period will be
extracted. (default is zero).
Returns
----------
axes : list
Empty list is returned if filename_base is not None. Otherwise
a list of matplotlib.pyplot.axis is returned.
"""
from ...plot.plotutil import PlotUtilities
if not self.plottable:
raise TypeError("Simulation level packages are not plottable")
axes = PlotUtilities._plot_transient2d_helper(
self,
filename_base=filename_base,
file_extension=file_extension,
kper=kper,
fignum=fignum,
**kwargs,
)
return axes