Source code for flopy.mf6.data.mfdatastorage

import inspect
import os
import sys
from copy import deepcopy
from enum import Enum
from shutil import copyfile

import numpy as np

from ...utils.datautil import (
    ArrayIndexIter,
    DatumUtil,
    MultiList,
    MultiListIter,
    PyListUtil,
    clean_filename,
)
from ..data import mfdatautil
from ..data.mfstructure import DatumType, MFDataItemStructure
from ..mfbase import MFDataException, VerbosityLevel
from .mfdatautil import MFComment, convert_data, iterable
from .mffileaccess import MFFileAccess, MFFileAccessArray, MFFileAccessList


[docs]class DataStorageType(Enum): """ Enumeration of different ways that data can be stored """ internal_array = 1 internal_constant = 2 external_file = 3
[docs]class DataStructureType(Enum): """ Enumeration of different data structures used to store data """ ndarray = 1 recarray = 2 scalar = 3
[docs]class LayerStorage: """ Stores a single layer of data. Parameters ---------- data_storage : DataStorage Parent data storage object that layer is contained in lay_num : int Layer number of layered being stored data_storage_type : DataStorageType Method used to store the data Attributes ---------- internal_data : ndarray or recarray data being stored, if full data is being stored internally in memory data_const_value : int/float constant value of data being stored, if data is a constant data_storage_type : DataStorageType method used to store the data fname : str file name of external file containing the data factor : int/float factor to multiply the data by iprn : int print code binary : bool whether the data is stored in a binary file Methods ------- get_const_val(layer) gets the constant value of a given layer. data storage type for layer must be "internal_constant". get_data(layer) : ndarray/recarray/string returns the data for the specified layer set_data(data, layer=None, multiplier=[1.0] sets the data being stored to "data" for layer "layer", replacing all data for that layer. a multiplier can be specified. See Also -------- Notes ----- Examples -------- """ def __init__( self, data_storage, lay_indexes, data_storage_type=DataStorageType.internal_array, data_type=None, ): self._data_storage_parent = data_storage self._lay_indexes = lay_indexes self.internal_data = None self.data_const_value = None self.data_storage_type = data_storage_type self.data_type = data_type self.fname = None if self.data_type == DatumType.integer: self.factor = 1 else: self.factor = 1.0 self.iprn = None self.binary = False
[docs] def set_internal_constant(self): self.data_storage_type = DataStorageType.internal_constant
[docs] def set_internal_array(self): self.data_storage_type = DataStorageType.internal_array
@property def name(self): return self._data_storage_parent.data_dimensions.structure.name def __repr__(self): if self.data_storage_type == DataStorageType.internal_constant: return f"constant {self.get_data_const_val()}" else: return repr(self.get_data()) def __str__(self): if self.data_storage_type == DataStorageType.internal_constant: return str(self.get_data_const_val()) else: return str(self.get_data()) def __getattr__(self, attr): if attr == "binary" or not hasattr(self, "binary"): raise AttributeError(attr) if attr == "array": return self._data_storage_parent.get_data(self._lay_indexes, True) elif attr == "__getstate__": raise AttributeError(attr)
[docs] def set_data(self, data): self._data_storage_parent.set_data( data, self._lay_indexes, [self.factor] )
[docs] def get_data(self): return self._data_storage_parent.get_data(self._lay_indexes, False)
[docs] def get_data_const_val(self): if isinstance(self.data_const_value, list): return self.data_const_value[0] else: return self.data_const_value
[docs]class DataStorage: """ Stores and retrieves data. Parameters ---------- sim_data : simulation data class reference to the simulation data class data_dimensions : data dimensions class a data dimensions class for the data being stored get_file_entry : method reference method that returns the file entry for the stored data data_storage_type : enum how the data will be stored (internally, as a constant, as an external file) data_structure_type : enum what internal type is the data stored in (ndarray, recarray, scalar) layer_shape : int number of data layers layered : bool is the data layered layer_storage : MultiList<LayerStorage> one or more dimensional list of LayerStorage Attributes ---------- data_storage_type : list list of data storage types, one for each layer data_const_value : list list of data constants, one for each layer external_file_path : list list of external file paths, one for each layer multiplier : list list of multipliers, one for each layer print_format : list list of print formats, one for each layer data_structure_type : what internal type is the data stored in (ndarray, recarray, scalar) layered : bool is the data layered pre_data_comments : string any comments before the start of the data comments : dict any comments mixed in with the data, dictionary keys are data lines post_data_comments : string any comments after the end of the data Methods ------- override_data_type : (index, data_type) overrides the data type used in a recarray at index "index" with data type "data_type" get_external_file_path(layer) gets the path to an external file for layer "layer" get_const_val(layer) gets the constant value of a given layer. data storage type for layer must be "internal_constant". has_data(layer) : bool returns true if data exists for the specified layer, false otherwise get_data(layer) : ndarray/recarray/string returns the data for the specified layer update_item(data, key_index) updates the data in a recarray at index "key_index" with data "data". data is a list containing all data for a single record in the recarray. . data structure type must be recarray append_data(data) appends data "data" to the end of a recarray. data structure type must be recarray set_data(data, layer=None, multiplier=[1.0] sets the data being stored to "data" for layer "layer", replacing all data for that layer. a multiplier can be specified. get_active_layer_indices() : list returns the indices of all layers expected to contain data store_internal(data, layer=None, const=False, multiplier=[1.0]) store data "data" at layer "layer" internally store_external(file_path, layer=None, multiplier=[1.0], print_format=None, data=None, do_not_verify=False) store data "data" at layer "layer" externally in file "file_path" external_to_external(new_external_file, multiplier=None, layer=None) copies existing external data to the new file location and points to the new file external_to_internal(layer_num=None, store_internal=False) : ndarray/recarray loads existing external data for layer "layer_num" and returns it. if store_internal is True it also storages the data internally, changing the storage type for "layer_num" layer to internal. internal_to_external(new_external_file, multiplier=None, layer=None, print_format=None) stores existing internal data for layer "layer" to external file "new_external_file" read_data_from_file(layer, fd=None, multiplier=None) : (ndarray, int) reads in data from a given file "fd" as data from layer "layer". returns data as an ndarray along with the size of the data to_string(val, type, is_cellid=False, possible_cellid=False) converts data "val" of type "type" to a string. is_cellid is True if the data type is known to be a cellid and is treated as such. when possible_cellid is True the data is checked to see if it matches the shape/dimensions of a cellid before using it as one. resolve_data_size(index) : int resolves the size of a given data element in a recarray based on the names in the existing rec_array. assumes repeating data element names follow the format <data_element_name>_X. returns the number of times the data element repeats. flatten() converts layered data to a non-layered data make_layered() converts non-layered data to layered data See Also -------- Notes ----- Examples -------- """ def __init__( self, sim_data, model_or_sim, data_dimensions, get_file_entry, data_storage_type=DataStorageType.internal_array, data_structure_type=DataStructureType.ndarray, layer_shape=(1,), layered=False, stress_period=0, data_path=(), ): self.data_dimensions = data_dimensions self._model_or_sim = model_or_sim self._simulation_data = sim_data self._get_file_entry = get_file_entry self._data_type_overrides = {} self._data_storage_type = data_storage_type self._stress_period = stress_period self._data_path = data_path if not data_structure_type == DataStructureType.recarray: self._data_type = self.data_dimensions.structure.get_datum_type( return_enum_type=True ) else: self._data_type = None self.layer_storage = MultiList( shape=layer_shape, callback=self._create_layer ) self.data_structure_type = data_structure_type package_dim = self.data_dimensions.package_dim self.in_model = ( self.data_dimensions is not None and len(package_dim.package_path) > 1 and package_dim.model_dim[0].model_name is not None and package_dim.model_dim[0].model_name.lower() == package_dim.package_path[0] ) if data_structure_type == DataStructureType.recarray: self.build_type_list(resolve_data_shape=False) self.layered = layered # initialize comments self.pre_data_comments = None self.comments = {} def __repr__(self): return self.get_data_str(True) def __str__(self): return self.get_data_str(False) def _create_layer(self, indexes): return LayerStorage( self, indexes, self._data_storage_type, self._data_type )
[docs] def flatten(self): self.layered = False storage_type = self.layer_storage.first_item().data_storage_type self.layer_storage = MultiList( mdlist=[LayerStorage(self, 0, storage_type, self._data_type)] )
[docs] def make_layered(self): if not self.layered: if self.data_structure_type != DataStructureType.ndarray: message = ( 'Data structure type "{}" does not support ' "layered data.".format(self.data_structure_type) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "making data layered", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) if ( self.layer_storage.first_item().data_storage_type == DataStorageType.external_file ): message = ( "Converting external file data into layered " "data currently not support." ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "making data layered", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) previous_storage = self.layer_storage.first_item() data = previous_storage.get_data() data_dim = self.get_data_dimensions(None) self.layer_storage = MultiList( shape=(data_dim[0],), callback=self._create_layer ) if ( previous_storage.data_storage_type == DataStorageType.internal_constant ): for storage in self.layer_storage.elements(): storage.data_const_value = ( previous_storage.data_const_value ) elif ( previous_storage.data_storage_type == DataStorageType.internal_array ): data_ml = MultiList(data) if not ( data_ml.get_total_size() == self.layer_storage.get_total_size() ): message = ( "Size of data ({}) does not match expected " "value of {}" ".".format( data_ml.get_total_size(), self.layer_storage.get_total_size(), ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "making data layered", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) for data_layer, storage in zip( data, self.layer_storage.elements() ): storage.internal_data = data_layer storage.factor = previous_storage.factor storage.iprn = previous_storage.iprn self.layered = True
[docs] def get_data_str(self, formal): data_str = "" # Assemble strings for internal array data for index, storage in enumerate(self.layer_storage.elements()): if self.layered: layer_str = f"Layer_{index + 1}" else: layer_str = "" if storage.data_storage_type == DataStorageType.internal_array: if storage.internal_data is not None: header = self._get_layer_header_str(index) if formal: if self.layered: data_str = "{}{}{{{}}}\n({})\n".format( data_str, layer_str, header, repr(self.get_data((index,))), ) else: data_str = "{}{}{{{}}}\n({})\n".format( data_str, layer_str, header, repr(self.get_data((index,))), ) else: data_str = "{}{}{{{}}}\n({})\n".format( data_str, layer_str, header, str(self.get_data((index,))), ) elif ( storage.data_storage_type == DataStorageType.internal_constant ): if formal: if storage.data_const_value is not None: data_str = "{}{}{{{}}}\n".format( data_str, layer_str, self._get_layer_header_str(index), ) else: if storage.data_const_value is not None: data_str = "{}{}{{{}}}\n".format( data_str, layer_str, self._get_layer_header_str(index), ) elif storage.data_storage_type == DataStorageType.external_file: header = self._get_layer_header_str(index) if self.layered: data_str = "{}{}{{{}}}\n({})\n".format( data_str, layer_str, header, "External data not displayed", ) else: data_str = "{}{}{{{}}}\n({})\n".format( data_str, layer_str, header, "External data not displayed", ) return data_str
def _get_layer_header_str(self, layer): header_list = [] if ( self.layer_storage[layer].data_storage_type == DataStorageType.external_file ): header_list.append(f"open/close {self.layer_storage[layer].fname}") elif ( self.layer_storage[layer].data_storage_type == DataStorageType.internal_constant ): lr = self.layer_storage[layer] header_list.append(f"constant {lr}") else: header_list.append("internal") if ( self.layer_storage[layer].factor != 1.0 and self.layer_storage[layer].factor != 1 and self.data_structure_type != DataStructureType.recarray ): header_list.append(f"factor {self.layer_storage[layer].factor}") if self.layer_storage[layer].iprn is not None: header_list.append(f"iprn {self.layer_storage[layer].iprn}") if len(header_list) > 0: return ", ".join(header_list) else: return ""
[docs] def init_layers(self, dimensions): self.layer_storage = MultiList( shape=dimensions, callback=self._create_layer )
[docs] def add_layer(self, dimension=2): self.layer_storage.increment_dimension(dimension, self._create_layer)
[docs] def override_data_type(self, index, data_type): self._data_type_overrides[index] = data_type
[docs] def get_external_file_path(self, layer): if layer is None: return self.layer_storage[0].fname else: return self.layer_storage[layer].fname
[docs] def get_const_val(self, layer=None): if layer is None: if not self.layer_storage.get_total_size() >= 1: message = "Can not get constant value. No data is available." type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "getting constant value", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) first_item = self.layer_storage.first_item() if ( not first_item.data_storage_type == DataStorageType.internal_constant ): message = ( "Can not get constant value. Storage type must be " "internal_constant." ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "getting constant value", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) return first_item.get_data_const_val() else: if not self.layer_storage.in_shape(layer): message = ( 'Can not get constant value. Layer "{}" is not a ' "valid layer.".format(layer) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "getting constant value", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) if ( not self.layer_storage[layer].data_storage_type == DataStorageType.internal_constant ): message = ( "Can not get constant value. Storage type must be " "internal_constant." ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "getting constant value", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) return self.layer_storage[layer].get_data_const_val()
[docs] def get_record(self, layer=None): if layer is None: if self.layered: record_dict = {} for lay_num in self.layer_storage.indexes(): record_dict[lay_num[0]] = self._get_record_layer(lay_num) return record_dict else: return self._get_record_layer(0) else: return self._get_record_layer(layer)
def _get_record_layer(self, lay_num): if ( self.layer_storage[lay_num].data_storage_type == DataStorageType.external_file ): layer_dict = { "filename": self.layer_storage[lay_num].fname, "binary": self.layer_storage[lay_num].binary, } else: layer_dict = {} if self.data_structure_type == DataStructureType.ndarray: layer_dict["factor"] = self.layer_storage[lay_num].factor layer_dict["iprn"] = self.layer_storage[lay_num].iprn layer_dict["data"] = self.get_data(lay_num, False, False) return layer_dict
[docs] def has_data(self, layer=None): ret_val = self._access_data(layer, False) return ret_val is not None and ret_val is not False
[docs] def get_data(self, layer=None, apply_mult=True, block_exists=False): data = self._access_data(layer, True, apply_mult=apply_mult) if data is None and block_exists: return [] else: return data
def _access_data(self, layer, return_data=False, apply_mult=True): layer_check = self._resolve_layer(layer) if ( self.layer_storage[layer_check].internal_data is None and self.layer_storage[layer_check].data_storage_type == DataStorageType.internal_array ) or ( self.layer_storage[layer_check].data_const_value is None and self.layer_storage[layer_check].data_storage_type == DataStorageType.internal_constant ): return None if ( layer is None and ( self.data_structure_type == DataStructureType.ndarray or self.data_structure_type == DataStructureType.scalar ) and return_data ): # return data from all layers data = self._build_full_data(apply_mult) if data is None: if ( self.layer_storage.first_item().data_storage_type == DataStorageType.internal_constant ): return self.layer_storage.first_item().get_data()[0] else: return data if ( self.layer_storage[layer_check].data_storage_type == DataStorageType.external_file ): if return_data: return self.external_to_internal(layer, apply_mult=apply_mult) else: return True else: if ( self.data_structure_type == DataStructureType.ndarray and self.layer_storage[layer_check].data_const_value is None and self.layer_storage[layer_check].internal_data is None ): return None if not (layer is None or self.layer_storage.in_shape(layer)): message = f'Layer "{layer}" is an invalid layer.' type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "accessing data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) if layer is None: if ( self.data_structure_type == DataStructureType.ndarray or self.data_structure_type == DataStructureType.scalar ): if self.data_structure_type == DataStructureType.scalar: return ( self.layer_storage.first_item().internal_data is not None ) check_storage = self.layer_storage[layer_check] return ( check_storage.data_const_value is not None and check_storage.data_storage_type == DataStorageType.internal_constant ) or ( check_storage.internal_data is not None and check_storage.data_storage_type == DataStorageType.internal_array ) else: if ( self.layer_storage[layer_check].data_storage_type == DataStorageType.internal_constant ): if return_data: # recarray stored as a constant. currently only # support grid-based constant recarrays. build # a recarray of all cells data_list = [] model_grid = self.data_dimensions.get_model_grid() structure = self.data_dimensions.structure package_dim = self.data_dimensions.package_dim for cellid in model_grid.get_all_model_cells(): first_item = self.layer_storage.first_item() data_line = (cellid,) + ( first_item.data_const_value, ) if len(structure.data_item_structures) > 2: # append None any expected optional data for ( data_item_struct ) in structure.data_item_structures[2:]: if ( data_item_struct.name != "boundname" or package_dim.boundnames() ): data_line = data_line + (None,) data_list.append(data_line) type_list = self.resolve_typelist(data_list) return np.rec.array(data_list, type_list) else: return ( self.layer_storage[ layer_check ].data_const_value is not None ) else: if return_data: return ( self.layer_storage.first_item().internal_data ) else: return True elif ( self.layer_storage[layer].data_storage_type == DataStorageType.internal_array ): if return_data: return self.layer_storage[layer].internal_data else: return self.layer_storage[layer].internal_data is not None elif ( self.layer_storage[layer].data_storage_type == DataStorageType.internal_constant ): layer_storage = self.layer_storage[layer] if return_data: data = self._fill_const_layer(layer) if data is None: if ( layer_storage.data_storage_type == DataStructureType.internal_constant ): return layer_storage.data_const_value[0] else: return data else: return layer_storage.data_const_value is not None else: if return_data: return self.get_external(layer) else: return True
[docs] def append_data(self, data): # currently only support appending to recarrays if not (self.data_structure_type == DataStructureType.recarray): message = ( 'Can not append to data structure "{}". Can only ' "append to a recarray datastructure" ".".format(self.data_structure_type) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "appending data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) internal_data = self.layer_storage.first_item().internal_data if internal_data is None: if len(data[0]) != len(self._recarray_type_list): # rebuild type list using existing data as a guide self.build_type_list(data=data) self.set_data(np.rec.array(data, self._recarray_type_list)) else: if len(self.layer_storage.first_item().internal_data[0]) < len( data[0] ): # Rebuild recarray to fit larger size count = 0 last_count = len(data[0]) - len(internal_data[0]) while count < last_count: self._duplicate_last_item() count += 1 internal_data_list = internal_data.tolist() for data_item in data: internal_data_list.append(data_item) self._add_placeholders(internal_data_list) self.set_data( np.rec.array(internal_data_list, self._recarray_type_list) ) else: first_item = self.layer_storage.first_item() if len(first_item.internal_data[0]) > len(data[0]): # Add placeholders to data self._add_placeholders(data) self.set_data( np.hstack( ( internal_data, np.rec.array(data, self._recarray_type_list), ) ) )
[docs] def set_data( self, data, layer=None, multiplier=None, key=None, autofill=False, check_data=False, preserve_record=False, ): if ( self.data_structure_type == DataStructureType.recarray or self.data_structure_type == DataStructureType.scalar ): self._set_list( data, layer, multiplier, key, autofill, check_data, preserve_record, ) else: self._set_array( data, layer, multiplier, key, autofill, preserve_record )
def _set_list( self, data, layer, multiplier, key, autofill, check_data=False, preserve_record=False, ): if isinstance(data, dict): if "filename" in data: if "binary" in data and data["binary"]: if self.data_dimensions.package_dim.boundnames(): message = ( "Unable to store list data ({}) to a binary " "file when using boundnames" ".".format(self.data_dimensions.structure.name) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "writing list data to binary file", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) self.process_open_close_line(data, layer) return elif "data" in data: data = data["data"] if isinstance(data, list): if len(data) > 0 and ( not PyListUtil.is_iterable(data[0]) or isinstance(data[0], str) ): # single line of data needs to be encapsulated in a tuple data = [tuple(data)] if preserve_record: if ( self.layer_storage[0].data_storage_type == DataStorageType.external_file and self.layer_storage[0].fname ): # build dictionary with current record data and # store externally data_record = { "filename": self.layer_storage[0].fname, "binary": self.layer_storage[0].binary, "data": data, } self.process_open_close_line(data_record, layer) return self.store_internal( data, layer, False, multiplier, key=key, autofill=autofill, check_data=check_data, preserve_record=preserve_record, ) def _set_array( self, data, layer, multiplier, key, autofill, preserve_record=False ): # make a list out of a single item if ( isinstance(data, int) or isinstance(data, float) or isinstance(data, str) ): data = [data] success = False if preserve_record: if isinstance(data, np.ndarray): # try to store while preserving the structure of the # existing record if self.layer_storage.get_total_size() > 1: if len(data) == self.layer_storage.get_total_size(): # break ndarray into layers and store success = self._set_array_by_layer( data, multiplier, key, preserve_record ) else: # try to store as a single layer success = self._set_array_layer( data, layer, multiplier, key, preserve_record ) elif isinstance(data, dict): first_key = list(data.keys())[0] if isinstance(first_key, int): for layer_num, data_layer in data.items(): success = self._set_array_layer( data_layer, layer_num, multiplier, key, preserve_record, ) if not success: # storing while preserving the record failed, try storing as a # new record preserve_record = False # check for possibility of multi-layered data layer_num = 0 if ( layer is None and self.data_structure_type == DataStructureType.ndarray and len(data) == self.layer_storage.get_total_size() and not isinstance(data, dict) ): success = self._set_array_by_layer( data, multiplier, key, preserve_record ) if not success: # try to store as a single layer success = self._set_array_layer( data, layer, multiplier, key, preserve_record ) self.layered = bool(self.layer_storage.get_total_size() > 1) if not success: message = ( 'Unable to set data "{}" layer {}. Data is not ' "in a valid format" ".".format(self.data_dimensions.structure.name, layer_num) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "setting array data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) def _set_array_by_layer(self, data, multiplier, key, preserve_record): # loop through list and try to store each list entry as a layer success = True for layer_num, layer_data in enumerate(data): if ( not isinstance(layer_data, list) and not isinstance(layer_data, dict) and not isinstance(layer_data, np.ndarray) ): layer_data = [layer_data] layer_index = self.layer_storage.nth_index(layer_num) success = success and self._set_array_layer( layer_data, layer_index, multiplier, key, preserve_record ) return success def _set_array_layer(self, data, layer, multiplier, key, preserve_record): # look for a single constant value data_type = self.data_dimensions.structure.get_datum_type( return_enum_type=True ) if isinstance(data, np.ndarray) and preserve_record: # store data and preserve record layer = self._layer_prep(layer) if not self.layer_storage.in_shape(layer): return False if ( self.layer_storage[layer].data_storage_type == DataStorageType.external_file ): self.store_external( self.layer_storage[layer].fname, layer, self.layer_storage[layer].factor, self.layer_storage[layer].iprn, data=data, do_not_verify=True, binary=self.layer_storage[layer].binary, preserve_record=preserve_record, ) else: self.store_internal( data, layer, False, multiplier, key=key, preserve_record=preserve_record, ) return True if not isinstance(data, dict) and not isinstance(data, str): if self._calc_data_size(data, 2) == 1 and self._is_type( data[0], data_type ): # store data as const self.store_internal(data, layer, True, multiplier, key=key) return True # look for internal and open/close data if isinstance(data, dict): data_la = None if "data" in data: if ( isinstance(data["data"], int) or isinstance(data["data"], float) or isinstance(data["data"], str) ): # data should always in in a list/array data["data"] = [data["data"]] data_la = data["data"] if "filename" in data: multiplier, iprn, binary = self.process_open_close_line( data, layer )[0:3] # store location to file self.store_external( data["filename"], layer, [multiplier], print_format=iprn, data=data_la, binary=binary, do_not_verify=True, ) return True elif "data" in data: multiplier, iprn = self.process_internal_line(data) if len(data["data"]) == 1 and ( DatumUtil.is_float(data["data"][0]) or DatumUtil.is_int(data["data"][0]) ): # merge multiplier with single value and make constant if DatumUtil.is_float(multiplier): mult = 1.0 else: mult = 1 self.store_internal( [data["data"][0] * multiplier], layer, True, [mult], key=key, print_format=iprn, ) else: self.store_internal( data["data"], layer, False, [multiplier], key=key, print_format=iprn, ) return True elif isinstance(data[0], str): if data[0].lower() == "internal": multiplier, iprn = self.process_internal_line(data) self.store_internal( data[-1], layer, False, [multiplier], key=key, print_format=iprn, ) return True elif data[0].lower() != "open/close": # assume open/close is just omitted new_data = data[:] new_data.insert(0, "open/close") else: new_data = data[:] self.process_open_close_line(new_data, layer, True) return True # try to resolve as internal array layer_storage = self.layer_storage[self._resolve_layer(layer)] if not ( layer_storage.data_storage_type == DataStorageType.internal_constant and PyListUtil.has_one_item(data) ): # store data as is try: self.store_internal(data, layer, False, multiplier, key=key) except MFDataException: return False return True return False
[docs] def get_active_layer_indices(self): layer_index = [] for index in self.layer_storage.indexes(): if ( self.layer_storage[index].fname is not None or self.layer_storage[index].internal_data is not None ): layer_index.append(index) return layer_index
[docs] def get_external(self, layer=None): if not (layer is None or self.layer_storage.in_shape(layer)): message = f'Can not get external data for layer "{layer}".' type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "getting external data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, )
[docs] def store_internal( self, data, layer=None, const=False, multiplier=None, key=None, autofill=False, print_format=None, check_data=False, preserve_record=False, ): if multiplier is None and layer is not None: layer = self._layer_prep(layer) if self.layer_storage.in_shape(layer) and preserve_record: multiplier = [self.layer_storage[layer].factor] else: multiplier = [self.get_default_mult()] if self.data_structure_type == DataStructureType.recarray: if ( self.layer_storage.first_item().data_storage_type == DataStorageType.internal_constant ): self.layer_storage.first_item().data_const_value = data else: self.layer_storage.first_item().data_storage_type = ( DataStorageType.internal_array ) if data is None or isinstance(data, np.recarray): if not self.tuple_cellids(data): # fix data so cellid is a single tuple data = self.make_tuple_cellids(data.tolist()) if data is None or isinstance(data, np.recarray): if self._simulation_data.verify_data and check_data: self._verify_list(data) self.layer_storage.first_item().internal_data = data else: if data is None: self.set_data(None) self.build_type_list() if isinstance(data, list): # look for single strings in list that describe # multiple items new_data = [] for item in data: if isinstance(item, str): # parse possible multi-item string new_data.append( self._resolve_data_line(item, key) ) else: new_data.append(item) data = new_data if isinstance(data, str): # parse possible multi-item string data = [self._resolve_data_line(data, key)] if ( data is not None and check_data and self._simulation_data.verify_data ): # check data line length self._check_list_length(data) if isinstance(data, np.recarray): self.layer_storage.first_item().internal_data = data elif autofill and data is not None: if isinstance(data, tuple) and isinstance( data[0], tuple ): # convert to list of tuples data = list(data) if isinstance(data, list) and DatumUtil.is_basic_type( data[0] ): # this is a simple list, turn it into a tuple # inside a list so that it is interpreted # correctly by numpy.recarray tupled_data = () for data_item in data: tupled_data += (data_item,) data = [tupled_data] if not isinstance(data, list): # put data in a list format for recarray data = [(data,)] # auto-fill tagged keyword structure = self.data_dimensions.structure data_item_structs = structure.data_item_structures if ( data_item_structs[0].tagged and not data_item_structs[0].type == DatumType.keyword ): for data_index, data_entry in enumerate(data): if ( isinstance(data_entry[0], str) and data_entry[0].lower() == data_item_structs[0].name.lower() ): break data[data_index] = ( data_item_structs[0].name.lower(), ) + data[data_index] if data is not None: new_data = self._build_recarray(data, key, autofill) self.layer_storage.first_item().internal_data = ( new_data ) elif self.data_structure_type == DataStructureType.scalar: if data == [()]: data = [(True,)] self.layer_storage.first_item().internal_data = data else: layer, multiplier = self._store_prep(layer, multiplier) dimensions = self.get_data_dimensions(layer) if preserve_record: factor = self.layer_storage[layer].factor adjustment = multiplier / factor if adjustment != 1.0: # convert numbers to be multiplied by the original factor data = data * adjustment if const: self.layer_storage[ layer ].data_storage_type = DataStorageType.internal_constant self.layer_storage[layer].data_const_value = [ mfdatautil.get_first_val(data) ] else: self.layer_storage[ layer ].data_storage_type = DataStorageType.internal_array try: self.layer_storage[layer].internal_data = np.reshape( data, dimensions ) except: message = ( 'An error occurred when reshaping data "{}" to store. ' "Expected data dimensions: {}".format( self.data_dimensions.structure.name, dimensions ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "setting array data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) data_type = self.data_dimensions.structure.get_datum_type(True) dt = self.layer_storage[layer].internal_data.dtype if dt != data_type: self.layer_storage[ layer ].internal_data = self.layer_storage[ layer ].internal_data.astype(data_type) if not preserve_record: self.layer_storage[layer].factor = multiplier self.layer_storage[layer].iprn = print_format
def _resolve_data_line(self, data, key): if len(self._recarray_type_list) > 1: # add any missing leading keywords to the beginning of the string data_lst = data.strip().split() data_lst_updated = [] struct = self.data_dimensions.structure for data_item_index, data_item in enumerate( struct.data_item_structures ): print(data_item) if data_item.type == DatumType.keyword: if data_lst[0].lower() != data_item.name.lower(): data_lst_updated.append(data_item.name) else: data_lst_updated.append(data_lst.pop(0)) else: if ( struct.type == DatumType.record and data_lst[0].lower() != data_item.name.lower() ): data_lst_updated.append(data_item.name) break data_lst_updated += data_lst # parse the string as if it is being read from a package file file_access = MFFileAccessList( self.data_dimensions.structure, self.data_dimensions, self._simulation_data, self._data_path, self._stress_period, ) data_loaded = [] data_out = file_access.load_list_line( self, data_lst_updated, 0, data_loaded, False, current_key=key, data_line=data, zero_based=True, )[1] return tuple(data_out) return data def _get_min_record_entries(self, data=None): try: if isinstance(data, dict) and "data" in data: data = data["data"] type_list = self.build_type_list(data=data, min_size=True) except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "getting min record entries", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, ) return len(type_list) def _check_line_size(self, data_line, min_line_size): if 0 < len(data_line) < min_line_size: message = ( "Data line {} only has {} entries, " "minimum number of entries is " "{}.".format(data_line, len(data_line), min_line_size) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "storing data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) def _check_list_length(self, data_check): if iterable(data_check): # verify data length min_line_size = self._get_min_record_entries(data_check) if isinstance(data_check[0], np.record) or ( iterable(data_check[0]) and not isinstance(data_check[0], str) ): # data contains multiple records for data_line in data_check: self._check_line_size(data_line, min_line_size) else: self._check_line_size(data_check, min_line_size) def _build_recarray(self, data, key, autofill): if not mfdatautil.PyListUtil.is_iterable(data) or len(data) == 0: # set data to build empty recarray data = [()] self.build_type_list(data=data, key=key) if not self.tuple_cellids(data): # fix data so cellid is a single tuple data = self.make_tuple_cellids(data) if autofill and data is not None: # resolve any fields with data types that do not # agree with the expected type list self._resolve_multitype_fields(data) if isinstance(data, list): # data needs to be stored as tuples within a list. # if this is not the case try to fix it self._tupleize_data(data) # add placeholders to data so it agrees with # expected dimensions of recarray self._add_placeholders(data) try: new_data = np.rec.array(data, self._recarray_type_list) except: data_expected = [] for data_type in self._recarray_type_list: data_expected.append(f"<{data_type[0]}>") message = ( "An error occurred when storing data " '"{}" in a recarray. {} data is a one ' "or two dimensional list containing " 'the variables "{}" (some variables ' "may be optional, see MF6 " 'documentation), but data "{}" was ' "supplied.".format( self.data_dimensions.structure.name, self.data_dimensions.structure.name, " ".join(data_expected), data, ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "setting array data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) if self._simulation_data.verify_data: self._verify_list(new_data) return new_data def _get_cellid_size(self, data_item_name): model_num = DatumUtil.cellid_model_num( data_item_name, self.data_dimensions.structure.model_data, self.data_dimensions.package_dim.model_dim, ) model_grid = self.data_dimensions.get_model_grid(model_num=model_num) return model_grid.get_num_spatial_coordinates()
[docs] def make_tuple_cellids(self, data): # convert cellids from individual layer, row, column fields into # tuples (layer, row, column) new_data = [] current_cellid = () for line in data: data_idx = 0 new_line = [] for item, is_cellid in zip(line, self.recarray_cellid_list_ex): if is_cellid: cellid_size = self._get_cellid_size( self._recarray_type_list[data_idx][0], ) current_cellid += (item,) if len(current_cellid) == cellid_size: new_line.append(current_cellid) current_cellid = () else: new_line.append(item) data_idx += 1 new_data.append(tuple(new_line)) return new_data
[docs] def tuple_cellids(self, data): if data is None or len(data) == 0: return True for data_entry, cellid in zip(data[0], self.recarray_cellid_list): if cellid: if ( isinstance(data_entry, int) or isinstance(data_entry, np.int32) or isinstance(data_entry, np.int64) ): # cellid is stored in separate columns in the recarray # (eg: one column for layer one column for row and # one columne for column) return False else: # cellid is stored in a single column in the recarray # as a tuple return True return True
[docs] def resolve_typelist(self, data): if self.tuple_cellids(data): return self._recarray_type_list else: return self._recarray_type_list_ex
[docs] def resolve_cellidlist(self, data): if self.tuple_cellids(data): return self.recarray_cellid_list else: return self.recarray_cellid_list_ex
def _resolve_multitype_fields(self, data): # find any data fields where the data is not a consistent type itype_len = len(self._recarray_type_list) for data_entry in data: for index, data_val in enumerate(data_entry): if ( index < itype_len and self._recarray_type_list[index][1] != object and not isinstance( data_val, self._recarray_type_list[index][1] ) and ( not isinstance(data_val, int) or self._recarray_type_list[index][1] != float ) and ( self._recarray_type_list[index][1] != float or not isinstance(data_val, np.floating) ) ): # for inconsistent types use generic object type self._recarray_type_list[index] = ( self._recarray_type_list[index][0], object, )
[docs] def store_external( self, file_path, layer=None, multiplier=None, print_format=None, data=None, do_not_verify=False, binary=False, preserve_record=False, ): if multiplier is None and layer is not None: layer = self._layer_prep(layer) if self.layer_storage.in_shape(layer) and preserve_record: multiplier = [self.layer_storage[layer].factor] else: multiplier = [self.get_default_mult()] layer_new, multiplier = self._store_prep(layer, multiplier) # pathing to external file data_dim = self.data_dimensions model_name = data_dim.package_dim.model_dim[0].model_name fp_relative = file_path if model_name is not None and fp_relative is not None: rel_path = self._simulation_data.mfpath.model_relative_path[ model_name ] if rel_path is not None and len(rel_path) > 0 and rel_path != ".": # include model relative path in external file path # only if model relative path is not already in external # file path i.e. when reading! fp_rp_l = fp_relative.split(os.path.sep) rp_l_r = rel_path.split(os.path.sep)[::-1] for i, rp in enumerate(rp_l_r): if rp != fp_rp_l[len(rp_l_r) - i - 1]: fp_relative = os.path.join(rp, fp_relative) fp = self._simulation_data.mfpath.resolve_path( fp_relative, model_name ) else: fp = os.path.join( self._simulation_data.mfpath.get_sim_path(), fp_relative ) if layer_new in self.layer_storage: old_ext_file = self.layer_storage[layer_new].external old_binary = self.layer_storage[layer_new].binary if preserve_record and old_ext_file: # use old file settings fp = old_ext_file binary = old_binary if data is not None: if self.data_structure_type == DataStructureType.recarray: # create external file and write file entry to the file # store data internally first so that a file entry # can be generated self.store_internal( data, layer_new, False, [multiplier], None, False, print_format, preserve_record, ) if binary: file_access = MFFileAccessList( self.data_dimensions.structure, self.data_dimensions, self._simulation_data, self._data_path, self._stress_period, ) file_access.write_binary_file( self.layer_storage.first_item().internal_data, fp, self._model_or_sim.modeldiscrit, precision="double", ) else: # make sure folder exists file_path = os.path.split(fp)[0] if not os.path.exists(file_path): os.makedirs(file_path) # create file try: fd = open(fp, "w") except: message = ( "Unable to open file {}. Make sure the " "file is not locked and the folder exists" ".".format(fp) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "opening external file for writing", data_dim.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) ext_file_entry = self._get_file_entry() fd.write(ext_file_entry) fd.close() # set as external data self.layer_storage.first_item().internal_data = None else: # if self.layer_storage.in_shape(layer_new): # factor = self.layer_storage[layer_new].factor # if preserve_record: # adjustment = multiplier / factor # if adjustment != 1.0: # convert numbers to be multiplied by the # original factor # data = data * adjustment # store data externally in file data_size = self.get_data_size(layer_new) data_type = data_dim.structure.data_item_structures[0].type if self._calc_data_size(data, 2) == 1 and data_size > 1: # constant data, need to expand self.layer_storage[layer_new].data_const_value = data self.layer_storage[ layer_new ].data_storage_type = DataStorageType.internal_constant data = self._fill_const_layer(layer) elif isinstance(data, list): data = self._to_ndarray(data, layer) if binary: text = self.data_dimensions.structure.name file_access = MFFileAccessArray( self.data_dimensions.structure, self.data_dimensions, self._simulation_data, self._data_path, self._stress_period, ) str_layered = self.data_dimensions.structure.layered file_access.write_binary_file( data, fp, text, self._model_or_sim.modeldiscrit, self._model_or_sim.modeltime, stress_period=self._stress_period, precision="double", write_multi_layer=(layer is None and str_layered), ) else: file_access = MFFileAccessArray( self.data_dimensions.structure, self.data_dimensions, self._simulation_data, self._data_path, self._stress_period, ) file_access.write_text_file( data, fp, data_type, data_size, ) if not preserve_record: self.layer_storage[layer_new].factor = multiplier self.layer_storage[layer_new].internal_data = None self.layer_storage[layer_new].data_const_value = None else: if self.data_structure_type == DataStructureType.recarray: self.layer_storage.first_item().internal_data = None else: self.layer_storage[layer_new].factor = multiplier self.layer_storage[layer_new].internal_data = None self.set_ext_file_attributes( layer_new, fp_relative, print_format, binary )
[docs] def set_ext_file_attributes(self, layer, file_path, print_format, binary): # point to the external file and set flags self.layer_storage[layer].fname = file_path self.layer_storage[layer].iprn = print_format self.layer_storage[layer].binary = binary self.layer_storage[ layer ].data_storage_type = DataStorageType.external_file
[docs] def point_to_existing_external_file(self, arr_line, layer): ( multiplier, print_format, binary, data_file, ) = self.process_open_close_line(arr_line, layer, store=False) self.set_ext_file_attributes(layer, data_file, print_format, binary) self.layer_storage[layer].factor = multiplier
[docs] def external_to_external( self, new_external_file, multiplier=None, layer=None, binary=None ): # currently only support files containing ndarrays if not (self.data_structure_type == DataStructureType.ndarray): message = ( 'Can not copy external file of type "{}". Only ' "files containing ndarrays currently supported" ".".format(self.data_structure_type) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "copy external file", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) if not ( (layer is None and self.layer_storage.get_total_size() == 1) or (layer is not None and self.layer_storage.in_shape(layer)) ): if layer is None: message = ( "When no layer is supplied the data must contain " "only one layer. Data contains {} layers" ".".format(self.layer_storage.get_total_size()) ) else: message = f'layer "{layer}" is not a valid layer' type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "copy external file", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) # get data storage if layer is None: layer = 1 if self.layer_storage[layer].fname is None: message = f"No file name exists for layer {layer}." type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "copy external file", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) # copy file to new location copyfile(self.layer_storage[layer].fname, new_external_file) # update if binary is None: binary = self.layer_storage[layer].binary self.store_external( new_external_file, layer, [self.layer_storage[layer].factor], self.layer_storage[layer].iprn, binary=binary, )
[docs] def external_to_internal( self, layer, store_internal=False, apply_mult=True ): # reset comments self.pre_data_comments = None self.comments = {} if layer is None: layer = 0 # load data from external file model_name = self.data_dimensions.package_dim.model_dim[0].model_name read_file = self._simulation_data.mfpath.resolve_path( self.layer_storage[layer].fname, model_name ) # currently support files containing ndarrays or recarrays if self.data_structure_type == DataStructureType.ndarray: file_access = MFFileAccessArray( self.data_dimensions.structure, self.data_dimensions, self._simulation_data, self._data_path, self._stress_period, ) if self.layer_storage[layer].binary: data_out = file_access.read_binary_data_from_file( read_file, self.get_data_dimensions(layer), self.get_data_size(layer), self._data_type, self._model_or_sim.modeldiscrit, )[0] else: data_out = file_access.read_text_data_from_file( self.get_data_size(layer), self._data_type, self.get_data_dimensions(layer), layer, self.layered, read_file, )[0] if apply_mult and self.layer_storage[layer].factor is not None: data_out = data_out * self.layer_storage[layer].factor if store_internal: self.store_internal(data_out, layer) return data_out elif self.data_structure_type == DataStructureType.recarray: file_access = MFFileAccessList( self.data_dimensions.structure, self.data_dimensions, self._simulation_data, self._data_path, self._stress_period, ) if self.layer_storage[layer].binary: data = file_access.read_binary_data_from_file( read_file, self._model_or_sim.modeldiscrit ) data_out = self._build_recarray(data, layer, False) else: with open(read_file) as fd_read_file: data_out = file_access.read_list_data_from_file( fd_read_file, self, self._stress_period, store_internal=False, ) if store_internal: self.store_internal(data_out, layer) return data_out else: path = self.data_dimensions.structure.path message = ( "Can not convert {} to internal data. External to " "internal file operations currently only supported " "for ndarrays.".format(path[-1]) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "opening external file for writing", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, )
[docs] def internal_to_external( self, new_external_file, multiplier=None, layer=None, print_format=None, binary=False, ): if layer is None: layer_item = self.layer_storage.first_item() else: layer_item = self.layer_storage[layer] if layer_item.data_storage_type == DataStorageType.internal_array: data = layer_item.internal_data else: data = self._fill_const_layer(layer) self.store_external( new_external_file, layer, multiplier, print_format, data, binary=binary, )
[docs] def resolve_shape_list( self, data_item, repeat_count, current_key, data_line, cellid_size=None, ): struct = self.data_dimensions.structure try: resolved_shape, shape_rule = self.data_dimensions.get_data_shape( data_item, struct, data_line, repeating_key=current_key ) except Exception as se: comment = ( 'Unable to resolve shape for data "{}" field "{}"' ".".format( struct.name, data_item.name ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( struct.get_model(), struct.get_package(), struct.path, "loading data list from package file", struct.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, se, ) if cellid_size is not None: data_item.remove_cellid(resolved_shape, cellid_size) if len(resolved_shape) == 1: if repeat_count < resolved_shape[0]: return True, shape_rule is not None elif resolved_shape[0] == -9999: # repeating unknown number of times in 1-D array return False, True return False, False
def _validate_cellid(self, arr_line, data_index, data_item): if not self.data_dimensions.structure.model_data: # not model data so this is not a cell id return False if arr_line is None: return False cellid_size = self._get_cellid_size(data_item.name) model_grid = self.data_dimensions.get_model_grid() if cellid_size + data_index > len(arr_line): return False for index, dim_size in zip( range(data_index, cellid_size + data_index), model_grid.get_model_dim(), ): if not DatumUtil.is_int(arr_line[index]): return False val = int(arr_line[index]) if val <= 0 or val > dim_size: return False return True
[docs] def add_data_line_comment(self, comment, line_num): if line_num in self.comments: self.comments[line_num].add_text("\n") self.comments[line_num].add_text(" ".join(comment)) else: self.comments[line_num] = MFComment( " ".join(comment), self.data_dimensions.structure.path, self._simulation_data, line_num, )
[docs] def process_internal_line(self, arr_line): multiplier = self.get_default_mult() print_format = None if isinstance(arr_line, list): index = 1 while index < len(arr_line): if isinstance(arr_line[index], str): word = arr_line[index].lower() if word == "factor" and index + 1 < len(arr_line): multiplier = convert_data( arr_line[index + 1], self.data_dimensions, self._data_type, ) index += 2 elif word == "iprn" and index + 1 < len(arr_line): print_format = arr_line[index + 1] index += 2 else: break else: break elif isinstance(arr_line, dict): for key, value in arr_line.items(): if key.lower() == "factor": multiplier = convert_data( value, self.data_dimensions, self._data_type ) if key.lower() == "iprn": print_format = value return multiplier, print_format
[docs] def process_open_close_line(self, arr_line, layer, store=True): data_dim = self.data_dimensions ( multiplier, print_format, binary, data_file, data, comment, ) = mfdatautil.process_open_close_line( arr_line, data_dim, self._data_type, self._simulation_data.debug, store, ) if comment is not None: self.layer_storage[layer].comments = MFComment( comment, self.data_dimensions.structure.path, self._simulation_data, layer, ) if data_file is None: message = ( "Data array {} contains an OPEN/CLOSE without a " "fname (file name) specified" ".".format(data_dim.structure.name) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "processing open/close line", data_dim.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) if store: # store external info self.store_external( data_file, layer, [multiplier], print_format, binary=binary, data=data, ) # add to active list of external files model_name = data_dim.package_dim.model_dim[0].model_name self._simulation_data.mfpath.add_ext_file(data_file, model_name) return multiplier, print_format, binary, data_file
@staticmethod def _tupleize_data(data): for index, data_line in enumerate(data): if not isinstance(data_line, tuple): if isinstance(data_line, list): data[index] = tuple(data_line) else: data[index] = (data_line,) def _verify_list(self, data): if data is not None: model_grid = None cellid_size = None for data_line in data: data_line_len = len(data_line) for index in range( 0, min(data_line_len, len(self._recarray_type_list)) ): datadim = self.data_dimensions if ( self._recarray_type_list[index][0] == "cellid" and datadim.get_model_dim(None).model_name is not None and data_line[index] is not None ): # this is a cell id. verify that it contains the # correct number of integers if cellid_size is None: cellid_size = self._get_cellid_size( self._recarray_type_list[index][0] ) if ( cellid_size != 1 and len(data_line[index]) != cellid_size and isinstance(data_line[index], int) ): message = ( 'Cellid "{}" contains {} integer(s). ' "Expected a cellid containing {} " "integer(s) for grid type" " {}.".format( data_line[index], len(data_line[index]), cellid_size, str(model_grid.grid_type()), ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "verifying cellid", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) def _add_placeholders(self, data): for idx, data_line in enumerate(data): data_line_len = len(data_line) if data_line_len < len(self._recarray_type_list): for index in range( data_line_len, len(self._recarray_type_list) ): if self._recarray_type_list[index][1] == int: self._recarray_type_list[index] = ( self._recarray_type_list[index][0], object, ) data_line += (None,) elif self._recarray_type_list[index][1] == float: data_line += (np.nan,) else: data_line += (None,) data[idx] = data_line elif data_line_len > len(self._recarray_type_list): for index in range( len(self._recarray_type_list), data_line_len ): if data_line[-1] is None: dl = list(data_line) del dl[-1] data_line = tuple(dl) data[idx] = data_line def _duplicate_last_item(self): last_item = self._recarray_type_list[-1] arr_item_name = last_item[0].split("_") if DatumUtil.is_int(arr_item_name[-1]): new_item_num = int(arr_item_name[-1]) + 1 new_item_name = "_".join(arr_item_name[0:-1]) new_item_name = f"{new_item_name}_{new_item_num}" else: new_item_name = f"{last_item[0]}_1" self._recarray_type_list.append((new_item_name, last_item[1])) def _build_full_data(self, apply_multiplier=False): if self.data_structure_type == DataStructureType.scalar: return self.layer_storage.first_item().internal_data dimensions = self.get_data_dimensions(None) if dimensions[0] < 0: # dimensions can not be determined from dfn file, use # the size of the data provided as the dimensions dimensions = [self.layer_storage.get_total_size()] all_none = True np_data_type = self.data_dimensions.structure.get_datum_type() np_full_data_type = self.data_dimensions.structure.get_datum_type(True) if np.issubdtype(np_full_data_type, np.floating): fill_value = np.nan elif np.issubdtype(np_full_data_type, np.integer): fill_value = 0 elif np.issubdtype(np_full_data_type, np.bool_): fill_value = False else: fill_value = None full_data = np.full(dimensions, fill_value, np_full_data_type) is_aux = self.data_dimensions.structure.name == "aux" if is_aux: aux_data = [] if not self.layered: layers_to_process = [0] else: layers_to_process = self.layer_storage.indexes() for layer in layers_to_process: if ( self.layer_storage[layer].factor is not None and apply_multiplier ): mult = self.layer_storage[layer].factor elif self._data_type == DatumType.integer: mult = 1 else: mult = 1.0 if ( self.layer_storage[layer].data_storage_type == DataStorageType.internal_array ): if ( self.layer_storage[layer].internal_data is None or len(self.layer_storage[layer].internal_data) > 0 and self.layer_storage[layer].internal_data[0] is None ): if is_aux: full_data = None else: return None elif ( self.layer_storage.get_total_size() == 1 or not self.layered or not self._has_layer_dim() ): full_data = self.layer_storage[layer].internal_data * mult else: full_data[layer] = ( self.layer_storage[layer].internal_data * mult ) elif ( self.layer_storage[layer].data_storage_type == DataStorageType.internal_constant ): if ( self.layer_storage.get_total_size() == 1 or not self.layered or not self._has_layer_dim() ): full_data = self._fill_const_layer(layer) * mult else: full_data[layer] = self._fill_const_layer(layer) * mult else: file_access = MFFileAccessArray( self.data_dimensions.structure, self.data_dimensions, self._simulation_data, self._data_path, self._stress_period, ) model_name = self.data_dimensions.package_dim.model_dim[ 0 ].model_name read_file = self._simulation_data.mfpath.resolve_path( self.layer_storage[layer].fname, "" ) if self.layer_storage[layer].binary: data_out = ( file_access.read_binary_data_from_file( read_file, self.get_data_dimensions(layer), self.get_data_size(layer), self._data_type, self._model_or_sim.modeldiscrit, False, )[0] * mult ) else: data_out = ( file_access.read_text_data_from_file( self.get_data_size(layer), np_data_type, self.get_data_dimensions(layer), layer, self.layered, read_file, )[0] * mult ) if ( self.layer_storage.get_total_size() == 1 or not self.layered ): full_data = data_out else: if is_aux and full_data.shape == data_out.shape: full_data = data_out else: full_data[layer] = data_out if is_aux: if full_data is not None: all_none = False aux_data.append(full_data) full_data = np.full( dimensions, np.nan, self.data_dimensions.structure.get_datum_type(True), ) if is_aux: if all_none: return None else: return np.stack(aux_data, axis=0) else: return full_data def _resolve_layer(self, layer): if layer is None: return self.layer_storage.first_index() else: return layer def _to_ndarray(self, data, layer): data_dimensions = self.get_data_dimensions(layer) data_iter = MultiListIter(data) return self._fill_dimensions(data_iter, data_dimensions) def _fill_const_layer(self, layer): data_dimensions = self.get_data_dimensions(layer) if layer is None: ls = self.layer_storage.first_item() else: ls = self.layer_storage[layer] if data_dimensions[0] < 0: return ls.data_const_value[0] else: data_type = self.data_dimensions.structure.get_datum_type( numpy_type=True ) return np.full(data_dimensions, ls.data_const_value[0], data_type) def _is_type(self, data_item, data_type): if data_type == DatumType.string or data_type == DatumType.keyword: return True elif data_type == DatumType.integer: return DatumUtil.is_int(data_item) elif data_type == DatumType.double_precision: return DatumUtil.is_float(data_item) elif data_type == DatumType.keystring: # TODO: support keystring type if ( self._simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print("Keystring type currently not supported.") return True else: if ( self._simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print(f"{data_type} type checking currently not supported") return True def _fill_dimensions(self, data_iter, dimensions): if self.data_structure_type == DataStructureType.ndarray: np_dtype = MFFileAccess.datum_to_numpy_type(self._data_type)[0] # initialize array data_array = np.ndarray(shape=dimensions, dtype=np_dtype) # fill array for index in ArrayIndexIter(dimensions): data_array.itemset(index, next(data_iter)) return data_array elif self.data_structure_type == DataStructureType.scalar: return next(data_iter) else: data_array = None data_line = () # fill array array_index_iter = ArrayIndexIter(dimensions) current_col = 0 for index in array_index_iter: data_line += (index,) if current_col == dimensions[1] - 1: try: if data_array is None: data_array = np.rec.array( data_line, self._recarray_type_list ) else: rec_array = np.rec.array( data_line, self._recarray_type_list ) data_array = np.hstack((data_array, rec_array)) except: message = ( "An error occurred when storing data " '"{}" in a recarray. Data line being ' "stored: {}".format( self.data_dimensions.structure.name, data_line ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "processing open/close line", dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) current_col = 0 data_line = () data_array[index] = next(data_iter) return data_array
[docs] def set_tas(self, tas_name, tas_label, current_key, check_name=True): if check_name: package_dim = self.data_dimensions.package_dim tas_names = package_dim.get_tasnames() if ( tas_name.lower() not in tas_names and self._simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( "WARNING: Time array series name {} not found in any " "time series file".format(tas_name) ) # this is a time series array with a valid tas variable self.data_structure_type = DataStructureType.scalar try: self.set_data(f"{tas_label} {tas_name}", 0, key=current_key) except Exception as ex: type_, value_, traceback_ = sys.exc_info() structure = self.data_dimensions.structure raise MFDataException( structure.get_model(), structure.get_package(), structure.path, "storing data", structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, )
[docs] def resolve_data_size(self, index): # Resolves the size of a given data element based on the names in the # existing rec_array. Assumes repeating data element names follow the # format <data_element_name>_X if self.data_structure_type != DataStructureType.recarray: message = ( "Data structure type is {}. Data structure type must " "be recarray.".format(self.data_structure_type) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "resolving data size", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) if len(self.layer_storage.first_item().internal_data[0]) <= index: return 0 label = self.layer_storage.first_item().internal_data.dtype.names[ index ] label_list = label.split("_") if len(label_list) == 1: return 1 internal_data = self.layer_storage.first_item().internal_data for forward_index in range(index + 1, len(internal_data.dtype.names)): forward_label = internal_data.dtype.names[forward_index] forward_label_list = forward_label.split("_") if forward_label_list[0] != label_list[0]: return forward_index - index return len(internal_data.dtype.names) - index
[docs] def build_type_list( self, data_set=None, data=None, resolve_data_shape=True, key=None, nseg=None, cellid_expanded=False, min_size=False, overwrite_existing_type_list=True, ): if not overwrite_existing_type_list: existing_type_list = self._recarray_type_list existing_type_list_ex = self._recarray_type_list_ex if data_set is None: self.jagged_record = False self._recarray_type_list = [] self._recarray_type_list_ex = [] self.recarray_cellid_list = [] self.recarray_cellid_list_ex = [] data_set = self.data_dimensions.structure initial_keyword = True package_dim = self.data_dimensions.package_dim for data_item, index in zip( data_set.data_item_structures, range(0, len(data_set.data_item_structures)), ): # handle optional mnames if ( not data_item.optional or len(data_item.name) < 5 or data_item.name.lower()[0:5] != "mname" or not self.in_model ): overrides = self._data_type_overrides if len(self._recarray_type_list) in overrides: data_type = overrides[len(self._recarray_type_list)] elif isinstance(data_item, MFDataItemStructure): data_type = data_item.get_rec_type() else: data_type = None if data_item.name.lower() == "aux" and resolve_data_shape: aux_var_names = package_dim.get_aux_variables() if aux_var_names is not None: for aux_var_name in aux_var_names[0]: if aux_var_name.lower() != "auxiliary": self._append_type_lists( aux_var_name, data_type, False ) elif data_item.type == DatumType.record: # record within a record, recurse self.build_type_list(data_item, True, data) elif data_item.type == DatumType.keystring: self.jagged_record = True self._append_type_lists( data_item.name, data_type, data_item.is_cellid ) # add potential data after keystring to type list ks_data_item = deepcopy(data_item) ks_data_item.type = DatumType.string ks_data_item.name = f"{ks_data_item.name}_data" ks_rec_type = ks_data_item.get_rec_type() if not min_size: self._append_type_lists( ks_data_item.name, ks_rec_type, ks_data_item.is_cellid, ) if ( index == len(data_set.data_item_structures) - 1 and data is not None ): idx = 1 ( line_max_size, line_min_size, ) = self._get_max_min_data_line_size(data) if min_size: line_size = line_min_size else: line_size = line_max_size type_list = self.resolve_typelist(data) while len(type_list) < line_size: # keystrings at the end of a line can contain # items of variable length. assume everything at # the end of the data line is related to the last # keystring name = f"{ks_data_item.name}_{idx}" self._append_type_lists( name, ks_rec_type, ks_data_item.is_cellid ) idx += 1 elif ( data_item.name != "boundname" or self.data_dimensions.package_dim.boundnames() ): # don't include initial keywords if ( data_item.type != DatumType.keyword or data_set.block_variable or data_item.optional ): initial_keyword = False shape_rule = None if data_item.tagged: if ( data_item.type != DatumType.string and data_item.type != DatumType.keyword ): name = f"{data_item.name}_label" self._append_type_lists( name, object, data_item.is_cellid ) if ( nseg is not None and len(data_item.shape) > 0 and isinstance(data_item.shape[0], str) and data_item.shape[0][0:4] == "nseg" ): # nseg explicitly specified. resolve any formula # nseg is in model_dim = self.data_dimensions.get_model_dim( None ) exp_array = model_dim.build_shape_expression( data_item.shape ) if ( isinstance(exp_array, list) and len(exp_array) == 1 ): exp = exp_array[0] resolved_shape = [ model_dim.resolve_exp(exp, nseg) ] else: resolved_shape = [1] else: if resolve_data_shape: data_dim = self.data_dimensions ( resolved_shape, shape_rule, ) = data_dim.get_data_shape( data_item, data_set, data, repeating_key=key, min_size=min_size, ) else: resolved_shape = [1] if ( not resolved_shape or len(resolved_shape) == 0 or resolved_shape[0] == -1 ): # could not resolve shape resolved_shape = [1] elif ( resolved_shape[0] == -9999 or shape_rule is not None ): if data is not None and not min_size: # shape is an indeterminate 1-d array and # should consume the remainder of the data max_s = PyListUtil.max_multi_dim_list_size( data ) resolved_shape[0] = max_s - len( self._recarray_type_list ) else: # shape is indeterminate 1-d array and either # no data provided to resolve or request is # for minimum data size resolved_shape[0] = 1 if not min_size: self.jagged_record = True if data_item.is_cellid: if ( data_item.shape is not None and len(data_item.shape) > 0 and data_item.shape[0] == "ncelldim" ): # A cellid is a single entry (tuple) in the # recarray. Adjust dimensions accordingly. size = self._get_cellid_size(data_item.name) data_item.remove_cellid(resolved_shape, size) if not data_item.optional or not min_size: for index in range(0, resolved_shape[0]): if resolved_shape[0] > 1: name = f"{data_item.name}_{index}" else: name = data_item.name self._append_type_lists( name, data_type, data_item.is_cellid ) if cellid_expanded: new_type_list_ex = self._recarray_type_list_ex if not overwrite_existing_type_list: self._recarray_type_list = existing_type_list self._recarray_type_list_ex = existing_type_list_ex return new_type_list_ex else: new_type_list = self._recarray_type_list if not overwrite_existing_type_list: self._recarray_type_list = existing_type_list self._recarray_type_list_ex = existing_type_list_ex return new_type_list
[docs] def get_default_mult(self): if self._data_type == DatumType.integer: return 1 else: return 1.0
def _append_type_lists(self, name, data_type, iscellid): # add entry(s) to type lists self._recarray_type_list.append((name, data_type)) self.recarray_cellid_list.append(iscellid) if iscellid and self._model_or_sim.model_type is not None: # write each part of the cellid out as a separate entry # to _recarray_list_list_ex cellid_size = self._get_cellid_size(name) # determine header for different grid types if cellid_size == 1: self._do_ex_list_append(name, int, iscellid) elif cellid_size == 2: self._do_ex_list_append("layer", int, iscellid) self._do_ex_list_append("cell2d_num", int, iscellid) else: self._do_ex_list_append("layer", int, iscellid) self._do_ex_list_append("row", int, iscellid) self._do_ex_list_append("column", int, iscellid) else: self._do_ex_list_append(name, data_type, iscellid) return iscellid def _do_ex_list_append(self, name, data_type, iscellid): self._recarray_type_list_ex.append((name, data_type)) self.recarray_cellid_list_ex.append(iscellid) @staticmethod def _calc_data_size(data, count_to=None, current_length=None): if current_length is None: current_length = [0] if isinstance(data, np.ndarray): current_length[0] += data.size return data.size if isinstance(data, str) or isinstance(data, dict): return 1 try: for data_item in data: if hasattr(data_item, "__len__"): DataStorage._calc_data_size( data_item, count_to, current_length ) else: current_length[0] += 1 if count_to is not None and current_length[0] >= count_to: return current_length[0] except (ValueError, IndexError, TypeError): return 1 return current_length[0] @staticmethod def _get_max_min_data_line_size(data): max_size = 0 min_size = sys.maxsize if data is not None: for value in data: if len(value) > max_size: max_size = len(value) if len(value) < min_size: min_size = len(value) if min_size == sys.maxsize: min_size = 0 return max_size, min_size
[docs] def get_data_dimensions(self, layer): data_dimensions = self.data_dimensions.get_data_shape()[0] if ( layer is not None and self.layer_storage.get_total_size() > 1 and self._has_layer_dim() ): # remove all "layer" dimensions from the list layer_dims = self.data_dimensions.structure.data_item_structures[ 0 ].layer_dims data_dimensions = data_dimensions[len(layer_dims) :] return data_dimensions
def _has_layer_dim(self): return ( "nlay" in self.data_dimensions.structure.shape or "nodes" in self.data_dimensions.structure.shape ) def _layer_prep(self, layer): if layer is None: # layer is none means the data provided is for all layers or this # is not layered data layer = (0,) self.layer_storage.list_shape = (1,) self.layer_storage.multi_dim_list = [ self.layer_storage.first_item() ] if isinstance(layer, int): layer = (layer,) return layer def _store_prep(self, layer, multiplier): if not (layer is None or self.layer_storage.in_shape(layer)): message = f"Layer {layer} is not a valid layer." type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "storing data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) layer = self._layer_prep(layer) if multiplier is None: multiplier = self.get_default_mult() else: if isinstance(multiplier, float): multiplier = [multiplier] mult_ml = MultiList(multiplier) if not mult_ml.in_shape(layer): if multiplier[0] is None: multiplier = self.get_default_mult() else: multiplier = multiplier[0] else: if mult_ml.first_item() is None: multiplier = self.get_default_mult() else: multiplier = mult_ml.first_item() return layer, multiplier
[docs] def get_data_size(self, layer): dimensions = self.get_data_dimensions(layer) data_size = 1 for dimension in dimensions: data_size = data_size * dimension return data_size