Source code for flopy.mf6.data.mffileaccess

import inspect
import sys
from copy import deepcopy

import numpy as np

from ...utils import datautil
from ...utils.binaryfile import BinaryHeader
from ...utils.datautil import DatumUtil, PyListUtil, find_keyword
from ..data.mfstructure import DataType, DatumType, MFDataStructure
from ..mfbase import MFDataException, VerbosityLevel
from .mfdatautil import MFComment, convert_data, to_string


[docs]class MFFileAccess: def __init__( self, structure, data_dimensions, simulation_data, path, current_key ): self.structure = structure self._data_dimensions = data_dimensions self._simulation_data = simulation_data self._path = path self._current_key = current_key self._pos = 0 @staticmethod def _get_bintype(modelgrid): if modelgrid.grid_type == "vertex": return "vardisv" elif modelgrid.grid_type == "unstructured": return "vardisu" else: return "vardis" def _get_next_data_line(self, file_handle): end_of_file = False while not end_of_file: line = file_handle.readline() if line == "": message = ( "More data expected when reading {} from file " "{}".format(self.structure.name, file_handle.name) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self.structure.path, "reading data from file", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) clean_line = line.strip() # If comment or empty line if not MFComment.is_comment(clean_line, True): return datautil.PyListUtil.split_data_line(clean_line) def _read_pre_data_comments( self, line, file_handle, pre_data_comments, storage ): line_num = 0 if pre_data_comments: storage.pre_data_comments = MFComment( pre_data_comments.text, self._path, self._simulation_data, line_num, ) else: storage.pre_data_comments = None # read through any fully commented or empty lines PyListUtil.reset_delimiter_used() arr_line = PyListUtil.split_data_line(line) while MFComment.is_comment(arr_line, True) and line != "": if storage.pre_data_comments: storage.pre_data_comments.add_text("\n") storage.pre_data_comments.add_text(" ".join(arr_line)) else: storage.pre_data_comments = MFComment( arr_line, self._path, self._simulation_data, line_num ) line = file_handle.readline() arr_line = PyListUtil.split_data_line(line) return line def _get_aux_var_index(self, aux_name): aux_var_index = None # confirm whether the keyword found is an auxiliary variable name aux_var_names = self._data_dimensions.package_dim.get_aux_variables() if aux_var_names: for aux_var_name, index in zip( aux_var_names[0], range(0, len(aux_var_names[0])) ): if aux_name.lower() == aux_var_name.lower(): aux_var_index = index - 1 return aux_var_index def _load_keyword(self, arr_line, index_num, keyword): aux_var_index = None if keyword != "": # verify keyword keyword_found = arr_line[index_num].lower() keyword_match = keyword.lower() == keyword_found aux_var_names = None if not keyword_match: aux_var_index = self._get_aux_var_index(keyword_found) if not keyword_match and aux_var_index is None: aux_text = "" if aux_var_names is not None: # TODO: aux_var_names is None, so this is never touched aux_text = f" or auxiliary variables {aux_var_names[0]}" message = ( 'Error reading variable "{}". Expected ' 'variable keyword "{}"{} not found ' 'at line "{}". {}'.format( self.structure.name, keyword, aux_text, " ".join(arr_line), self._path, ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self.structure.path, "loading keyword", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) return (index_num + 1, aux_var_index) return (index_num, aux_var_index) def _open_ext_file(self, fname, binary=False, write=False): model_dim = self._data_dimensions.package_dim.model_dim[0] read_file = self._simulation_data.mfpath.resolve_path( fname, model_dim.model_name ) if write: options = "w" else: options = "r" if binary: options = f"{options}b" try: fd = open(read_file, options) return fd except: message = ( "Unable to open file {} in mode {}. Make sure the " "file is not locked and the folder exists" ".".format(read_file, options) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self._data_dimensions.structure.get_model(), self._data_dimensions.structure.get_package(), self._data_dimensions.structure.path, "opening external file for writing", self._data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, )
[docs] @staticmethod def datum_to_numpy_type(datum_type): if datum_type == DatumType.integer: return np.int32, "int" elif datum_type == DatumType.double_precision: return np.float64, "double" elif datum_type == DatumType.string or datum_type == DatumType.keyword: return str, "str" else: return None, None
[docs]class MFFileAccessArray(MFFileAccess): def __init__( self, structure, data_dimensions, simulation_data, path, current_key ): super().__init__( structure, data_dimensions, simulation_data, path, current_key )
[docs] def write_binary_file( self, data, fname, text, modelgrid=None, modeltime=None, stress_period=0, precision="double", write_multi_layer=False, ): data = self._resolve_cellid_numbers_to_file(data) fd = self._open_ext_file(fname, binary=True, write=True) if data.size == modelgrid.nnodes: write_multi_layer = False if write_multi_layer: # write data from each layer with a separate header for layer, value in enumerate(data): self._write_layer( fd, value, modelgrid, modeltime, stress_period, precision, text, fname, layer + 1, ) else: # write data with a single header self._write_layer( fd, data, modelgrid, modeltime, stress_period, precision, text, fname, ) fd.close()
def _write_layer( self, fd, data, modelgrid, modeltime, stress_period, precision, text, fname, ilay=None, ): header_data = self._get_header( modelgrid, modeltime, stress_period, precision, text, fname, ilay=ilay, data=data, ) header_data.tofile(fd) data.tofile(fd) def _get_header( self, modelgrid, modeltime, stress_period, precision, text, fname, ilay=None, data=None, ): # handle dis (row, col, lay), disv (ncpl, lay), and disu (nodes) cases if modelgrid is not None and modeltime is not None: pertim = modeltime.perlen[stress_period] totim = modeltime.perlen.sum() if ilay is None: ilay = modelgrid.nlay if modelgrid.grid_type == "structured": m1, m2, m3 = modelgrid.ncol, modelgrid.nrow, ilay if data is not None: shape3d = modelgrid.nlay * modelgrid.nrow * modelgrid.ncol if data.size == shape3d: m1, m2, m3 = shape3d, 1, 1 return BinaryHeader.create( bintype="vardis", precision=precision, text=text, m1=m1, m2=m2, m3=m3, pertim=pertim, totim=totim, kstp=1, kper=stress_period + 1, ) elif modelgrid.grid_type == "vertex": if ilay is None: ilay = modelgrid.nlay m1, m2, m3 = modelgrid.ncpl, 1, ilay if data is not None: shape3d = modelgrid.nlay * modelgrid.ncpl if data.size == shape3d: m1, m2, m3 = shape3d, 1, 1 return BinaryHeader.create( bintype="vardisv", precision=precision, text=text, m1=m1, m2=m2, m3=m3, pertim=pertim, totim=totim, kstp=1, kper=stress_period, ) elif modelgrid.grid_type == "unstructured": m1, m2, m3 = modelgrid.nnodes, 1, 1 return BinaryHeader.create( bintype="vardisu", precision=precision, text=text, m1=m1, m2=1, m3=1, pertim=pertim, totim=totim, kstp=1, kper=stress_period, ) else: if ilay is None: ilay = 1 m1, m2, m3 = 1, 1, ilay header = BinaryHeader.create( bintype="vardis", precision=precision, text=text, m1=m1, m2=m2, m3=m3, pertim=pertim, totim=totim, kstp=1, kper=stress_period, ) if ( self._simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( "Model grid does not have a valid type. Using " "default spatial discretization header values for " "binary file {}.".format(fname) ) else: m1, m2, m3 = 1, 1, 1 pertim = np.float64(1.0) header = BinaryHeader.create( bintype="vardis", precision=precision, text=text, m1=m1, m2=m2, m3=m3, pertim=pertim, totim=pertim, kstp=1, kper=stress_period, ) if ( self._simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( "Binary file data not part of a model. Using default " "spatial discretization header values for binary file " "{}.".format(fname) ) return header
[docs] def write_text_file(self, data, fp, data_type, data_size): try: fd = open(fp, "w") except: message = ( "Unable to open file {}. Make sure the file " "is not locked and the folder exists" ".".format(fp) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self._data_dimensions.structure.get_model(), self._data_dimensions.structure.get_package(), self._data_dimensions.structure.path, "opening external file for writing", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) fd.write(self.get_data_string(data, data_type, "")) fd.close()
[docs] def read_binary_data_from_file( self, fname, data_shape, data_size, data_type, modelgrid, read_multi_layer=False, ): import flopy.utils.binaryfile as bf if not isinstance(modelgrid.ncpl, np.ndarray): if data_size != modelgrid.ncpl: read_multi_layer = True fd = self._open_ext_file(fname, True) numpy_type, name = self.datum_to_numpy_type(data_type) header_dtype = bf.BinaryHeader.set_dtype( bintype=self._get_bintype(modelgrid), precision="double" ) if read_multi_layer and len(data_shape) > 1: try: all_data = np.empty(data_shape, numpy_type) headers = [] layer_shape = data_shape[1:] layer_data_size = int(data_size / data_shape[0]) for index in range(0, data_shape[0]): layer_data = self._read_binary_file_layer( fd, fname, header_dtype, numpy_type, layer_data_size, layer_shape, ) all_data[index, :] = layer_data[0] headers.append(layer_data[1]) fd.close() return all_data, headers except MFDataException: fd.seek(self._pos, 0) bin_data = self._read_binary_file_layer( fd, fname, header_dtype, numpy_type, data_size, data_shape ) self._pos = fd.tell() fd.close() return bin_data else: bin_data = self._read_binary_file_layer( fd, fname, header_dtype, numpy_type, data_size, data_shape ) fd.close() return bin_data
[docs] def get_data_string(self, data, data_type, data_indent=""): layer_data_string = [str(data_indent)] line_data_count = 0 indent_str = self._simulation_data.indent_string data_iter = datautil.PyListUtil.next_item(data) is_cellid = ( self.structure.data_item_structures[0].numeric_index or self.structure.data_item_structures[0].is_cellid ) jag_arr = self.structure.data_item_structures[0].jagged_array jagged_def = None jagged_def_index = 0 if jag_arr is not None: # get jagged array definition jagged_def_path = self._path[0:-1] + (jag_arr,) if jagged_def_path in self._simulation_data.mfdata: jagged_def = self._simulation_data.mfdata[ jagged_def_path ].array for item, last_item, new_list, nesting_change in data_iter: # increment data/layer counts line_data_count += 1 try: data_lyr = to_string( item, data_type, self._simulation_data, self._data_dimensions, is_cellid, verify_data=self._simulation_data.verify_data, ) except Exception as ex: type_, value_, traceback_ = sys.exc_info() comment = ( 'Could not convert data "{}" of type "{}" to a ' "string.".format(item, data_type) ) raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "converting data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ex, ) layer_data_string[-1] += f"{indent_str}{data_lyr}" if jagged_def is not None: if line_data_count == jagged_def[jagged_def_index]: layer_data_string.append(str(data_indent)) line_data_count = 0 jagged_def_index += 1 else: if self._simulation_data.wrap_multidim_arrays and ( line_data_count == self._simulation_data.max_columns_of_data or last_item ): layer_data_string.append(str(data_indent)) line_data_count = 0 if len(layer_data_string) > 0: # clean up the text at the end of the array layer_data_string[-1] = layer_data_string[-1].strip() if len(layer_data_string) == 1: return f"{data_indent}{layer_data_string[0].rstrip()}\n" else: return "\n".join(layer_data_string)
def _read_binary_file_layer( self, fd, fname, header_dtype, numpy_type, data_size, data_shape ): header_data = np.fromfile(fd, dtype=header_dtype, count=1) data = np.fromfile(fd, dtype=numpy_type, count=data_size) data = self._resolve_cellid_numbers_from_file(data) if data.size != data_size: message = ( "Binary file {} does not contain expected data. " "Expected array size {} but found size " "{}.".format(fname, data_size, data.size) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self._data_dimensions.structure.get_model(), self._data_dimensions.structure.get_package(), self._data_dimensions.structure.path, "opening external file for writing", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) return data.reshape(data_shape), header_data
[docs] def read_text_data_from_file( self, data_size, data_type, data_dim, layer, layered, fname=None, fd=None, ): # determine line size line_size = None if layered: # if the array is layered (meaning a control record for # each layer), then limit line size to number of columns if isinstance(data_dim, list): line_size = data_dim[-1] # load variable data from file current_size = 0 if layer is None: layer = 0 close_file = False if fd is None: close_file = True fd = self._open_ext_file(fname) data_raw = [] line = " " PyListUtil.reset_delimiter_used() if data_size < 0: # data size is not defined, load data until an end of data # indicator is found while True: line = fd.readline() arr_line = PyListUtil.split_data_line(line, True) if line == "" or arr_line[0].upper() == "END": break if not MFComment.is_comment(arr_line, True): if line_size is not None: arr_line = arr_line[:line_size] data_raw += arr_line else: PyListUtil.reset_delimiter_used() else: while line != "" and len(data_raw) < data_size: line = fd.readline() arr_line = PyListUtil.split_data_line(line, True) if not MFComment.is_comment(arr_line, True): if line_size is not None: arr_line = arr_line[:line_size] data_raw += arr_line else: PyListUtil.reset_delimiter_used() if len(data_raw) < data_size: message = ( 'Not enough data in file {} for data "{}". ' "Expected data size {} but only found " "{}.".format( fd.name, self._data_dimensions.structure.name, data_size, current_size, ) ) type_, value_, traceback_ = sys.exc_info() if close_file: fd.close() raise MFDataException( self._data_dimensions.structure.get_model(), self._data_dimensions.structure.get_package(), self._data_dimensions.structure.path, "reading data file", self._data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) if data_type == DatumType.double_precision: data_type = np.float64 elif data_type == DatumType.integer: data_type = np.int32 if data_size < 0: data_out = np.fromiter(data_raw, dtype=data_type) else: data_out = np.fromiter(data_raw, dtype=data_type, count=data_size) data_out = self._resolve_cellid_numbers_from_file(data_out) if close_file: fd.close() if data_size >= 0: data_out = np.reshape(data_out, data_dim) return data_out, current_size
[docs] def load_from_package( self, first_line, file_handle, layer_shape, storage, keyword, pre_data_comments=None, ): # read in any pre data comments current_line = self._read_pre_data_comments( first_line, file_handle, pre_data_comments, storage ) datautil.PyListUtil.reset_delimiter_used() arr_line = datautil.PyListUtil.split_data_line(current_line) package_dim = self._data_dimensions.package_dim if len(arr_line) > 2: # check for time array series if arr_line[1].upper() == "TIMEARRAYSERIES": storage.set_tas(arr_line[2], arr_line[1], self._current_key) return layer_shape, [False, None] if not self.structure.data_item_structures[0].just_data: # verify keyword index_num, aux_var_index = self._load_keyword(arr_line, 0, keyword) else: index_num = 0 aux_var_index = None # TODO: Add species support # if layered supported, look for layered flag if self.structure.layered or aux_var_index is not None: if ( len(arr_line) > index_num and arr_line[index_num].lower() == "layered" ): storage.layered = True try: layers = layer_shape except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "resolving layer dimensions", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, ) if len(layers) > 0: storage.init_layers(layers) elif aux_var_index is not None: # each layer stores a different aux variable layers = len(package_dim.get_aux_variables()[0]) - 1 layer_shape = (layers,) storage.layered = True while storage.layer_storage.list_shape[0] < layers: storage.add_layer() else: storage.flatten() try: dimensions = storage.get_data_dimensions(layer_shape) except Exception as ex: type_, value_, traceback_ = sys.exc_info() comment = ( f'Could not get data shape for key "{self._current_key}".' ) raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "getting data shape", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ex, ) layer_size = 1 for dimension in dimensions: layer_size *= dimension if aux_var_index is None: # loop through the number of layers for layer in storage.layer_storage.indexes(): self._load_layer( layer, layer_size, storage, arr_line, file_handle, layer_shape, ) else: # write the aux var to it's unique index self._load_layer( (aux_var_index,), layer_size, storage, arr_line, file_handle, layer_shape, ) return layer_shape, [False, None]
def _load_layer( self, layer, layer_size, storage, arr_line, file_handle, layer_shape ): di_struct = self.structure.data_item_structures[0] if not di_struct.just_data or datautil.max_tuple_abs_size(layer) > 0: arr_line = self._get_next_data_line(file_handle) layer_storage = storage.layer_storage[layer] # if constant if arr_line[0].upper() == "CONSTANT": if len(arr_line) < 2: message = ( 'MFArray "{}" contains a CONSTANT that is not ' "followed by a number.".format(self.structure.name) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "loading data layer from file", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) # store data layer_storage.set_internal_constant() try: storage.store_internal( [ convert_data( arr_line[1], self._data_dimensions, self.structure.type, di_struct, ) ], layer, const=True, ) except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "storing data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, ) # store anything else as a comment if len(arr_line) > 2: layer_storage.comments = MFComment( " ".join(arr_line[2:]), self._path, self._simulation_data, layer, ) # if internal elif arr_line[0].upper() == "INTERNAL": try: multiplier, print_format = storage.process_internal_line( arr_line ) except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "processing line of data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, ) storage.layer_storage[layer].set_internal_array() # store anything else as a comment if len(arr_line) > 5: layer_storage.comments = MFComment( " ".join(arr_line[5:]), self._path, self._simulation_data, layer, ) try: # load variable data from current file if multiplier is not None: storage.layer_storage[layer].factor = multiplier if print_format is not None: storage.layer_storage[layer].iprn = print_format data_type = storage.data_dimensions.structure.get_datum_type( True ) data_from_file = self.read_text_data_from_file( storage.get_data_size(layer), data_type, storage.get_data_dimensions(layer), layer, storage.layered, fd=file_handle, ) except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, f"reading data from file {file_handle.name}", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, ) data_shaped = self._resolve_data_shape( data_from_file[0], layer_shape, storage ) try: storage.store_internal( data_shaped, layer, const=False, multiplier=[multiplier], print_format=print_format, ) except Exception as ex: comment = f'Could not store data: "{data_shaped}"' type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "storing data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ex, ) elif arr_line[0].upper() == "OPEN/CLOSE": try: storage.process_open_close_line(arr_line, layer) except Exception as ex: comment = ( "Could not open open/close file specified by" ' "{}".'.format(" ".join(arr_line)) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "storing data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ex, ) def _is_cellid_or_numeric_index(self): if ( self.structure.data_item_structures[0].numeric_index or self.structure.data_item_structures[0].is_cellid ): return True return False def _resolve_cellid_numbers_to_file(self, data): if self._is_cellid_or_numeric_index(): return abs(data) + 1 else: return data def _resolve_cellid_numbers_from_file(self, data): if self._is_cellid_or_numeric_index(): return abs(data) - 1 else: return data def _resolve_data_shape(self, data, layer_shape, storage): try: dimensions = storage.get_data_dimensions(layer_shape) except Exception as ex: type_, value_, traceback_ = sys.exc_info() comment = ( f'Could not get data shape for key "{self._current_key}".' ) raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "getting data shape", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ex, ) if isinstance(data, list) or isinstance(data, np.ndarray): try: return np.reshape(data, dimensions).tolist() except Exception as ex: type_, value_, traceback_ = sys.exc_info() comment = ( f'Could not reshape data to dimensions "{dimensions}".' ) raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "reshaping data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ex, ) else: return data
[docs]class MFFileAccessList(MFFileAccess): def __init__( self, structure, data_dimensions, simulation_data, path, current_key ): super().__init__( structure, data_dimensions, simulation_data, path, current_key ) self._last_line_info = [] self.simple_line = False
[docs] def read_binary_data_from_file( self, read_file, modelgrid, precision="double", build_cellid=True ): # read from file header, int_cellid_indexes, ext_cellid_indexes = self._get_header( modelgrid, precision ) file_array = np.fromfile(read_file, dtype=header, count=-1) if not build_cellid: return file_array # build data list for recarray cellid_size = len(self._get_cell_header(modelgrid)) data_list = [] for record in file_array: data_record = () current_cellid_size = 0 current_cellid = () for index, data_item in enumerate(record): if index in ext_cellid_indexes: current_cellid += (data_item - 1,) current_cellid_size += 1 if current_cellid_size == cellid_size: data_record += current_cellid data_record = (data_record,) current_cellid = () current_cellid_size = 0 else: data_record += (data_item,) data_list.append(data_record) return data_list
[docs] def write_binary_file( self, data, fname, modelgrid=None, precision="double" ): fd = self._open_ext_file(fname, binary=True, write=True) data_array = self._build_data_array(data, modelgrid, precision) data_array.tofile(fd) fd.close()
def _build_data_array(self, data, modelgrid, precision): header, int_cellid_indexes, ext_cellid_indexes = self._get_header( modelgrid, precision ) data_list = [] for record in data: new_record = () for index, column in enumerate(record): if index in int_cellid_indexes: if isinstance(column, int): new_record += (column + 1,) else: for item in column: new_record += (item + 1,) else: new_record += (column,) data_list.append(new_record) return np.array(data_list, dtype=header) def _get_header(self, modelgrid, precision): np_flt_type = np.float64 header = [] int_cellid_indexes = {} ext_cellid_indexes = {} ext_index = 0 for index, di_struct in enumerate(self.structure.data_item_structures): if di_struct.is_cellid: cell_header = self._get_cell_header(modelgrid) header += cell_header int_cellid_indexes[index] = True for index in range(ext_index, ext_index + len(cell_header)): ext_cellid_indexes[index] = True ext_index += len(cell_header) elif not di_struct.optional: header.append((di_struct.name, np_flt_type)) ext_index += 1 elif di_struct.name == "aux": aux_var_names = ( self._data_dimensions.package_dim.get_aux_variables() ) if aux_var_names is not None: for aux_var_name in aux_var_names[0]: if aux_var_name.lower() != "auxiliary": header.append((aux_var_name, np_flt_type)) ext_index += 1 return header, int_cellid_indexes, ext_cellid_indexes def _get_cell_header(self, modelgrid): if modelgrid.grid_type == "structured": return [("layer", np.int32), ("row", np.int32), ("col", np.int32)] elif modelgrid.grid_type == "vertex": return [("layer", np.int32), ("ncpl", np.int32)] else: return [("nodes", np.int32)]
[docs] def load_from_package( self, first_line, file_handle, storage, pre_data_comments=None ): # lock things to maximize performance self._data_dimensions.lock() self._last_line_info = [] self._data_line = None if first_line is None: first_line = file_handle.readline() # read in any pre data comments current_line = self._read_pre_data_comments( first_line, file_handle, pre_data_comments, storage ) # reset data line delimiter so that the next split_data_line will # automatically determine the delimiter datautil.PyListUtil.reset_delimiter_used() arr_line = datautil.PyListUtil.split_data_line(current_line) if arr_line and ( len(arr_line[0]) >= 2 and arr_line[0][:3].upper() == "END" ): return [False, arr_line] if len(arr_line) >= 2 and arr_line[0].upper() == "OPEN/CLOSE": try: storage.process_open_close_line(arr_line, (0,)) except Exception as ex: message = ( "An error occurred while processing the following " "open/close line: {}".format(current_line) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "processing open/close line", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ex, ) else: ( have_newrec_line, newrec_line, self._data_line, ) = self.read_list_data_from_file( file_handle, storage, self._current_key, current_line, self._data_line, ) return [have_newrec_line, newrec_line] # loop until end of block line = " " while line != "": arr_line = self._get_next_data_line(file_handle) if arr_line and ( len(arr_line[0]) >= 2 and arr_line[0][:3].upper() == "END" ): # end of block self._data_dimensions.unlock() return [False, line] self._data_dimensions.unlock() return [False, None]
[docs] def read_list_data_from_file( self, file_handle, storage, current_key, current_line=None, data_line=None, store_internal=True, ): self._data_dimensions.package_dim.locked = True data_rec = None data_loaded = [] self._temp_dict = {} self._last_line_info = [] store_data = False struct = self.structure self.simple_line = ( len(self._data_dimensions.package_dim.get_tsnames()) == 0 and not struct.is_mname ) for data_item in struct.data_item_structures: if ( data_item.optional and data_item.name != "boundname" and data_item.name != "aux" ): self.simple_line = False if current_line is None: current_line = file_handle.readline() PyListUtil.reset_delimiter_used() arr_line = PyListUtil.split_data_line(current_line) line_num = 0 # read any pre-data commented lines while current_line and MFComment.is_comment(arr_line, True): arr_line.insert(0, "\n") if storage.pre_data_comments is None: storage.pre_data_comments = MFComment( " ".join(arr_line), self._path, self._simulation_data, line_num, ) else: storage.pre_data_comments.add_text(" ".join(arr_line)) PyListUtil.reset_delimiter_used() current_line = file_handle.readline() arr_line = PyListUtil.split_data_line(current_line) try: data_line = self.load_list_line( storage, arr_line, line_num, data_loaded, True, current_key=current_key, data_line=data_line, )[1:] line_num += 1 store_data = True except MFDataException as err: # this could possibly be a constant line. line = file_handle.readline() arr_line = PyListUtil.split_data_line(line) if ( len(arr_line) >= 2 and arr_line[0].upper() == "CONSTANT" and len(struct.data_item_structures) >= 2 and struct.data_item_structures[0].name.upper() == "CELLID" ): # store first line as a comment if storage.pre_data_comments is None: storage.pre_data_comments = MFComment( current_line, struct.path, self._simulation_data, 0 ) else: storage.pre_data_comments.add_text(current_line) # store constant value for all cellids storage.layer_storage.first_item().set_internal_constant() if store_internal: storage.store_internal( convert_data( arr_line[1], self._data_dimensions, struct.data_item_structures[1].type, struct.data_item_structures[0], ), 0, const=True, ) else: data_rec = storage._build_recarray( arr_line[1], None, True ) line = " " while line != "": line = file_handle.readline() arr_line = PyListUtil.split_data_line(line) if arr_line and ( len(arr_line[0]) >= 2 and arr_line[0][:3].upper() == "END" ): return [False, line, data_line] else: # not a constant or open/close line, exception is valid comment = ( f'Unable to process line 1 of data list: "{current_line}"' ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( struct.get_model(), struct.get_package(), struct.path, "loading data list from package file", struct.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, err, ) if struct.type == DatumType.record or struct.type == DatumType.string: # records only contain a single line storage.append_data(data_loaded) storage.data_dimensions.unlock() return [False, None, data_line] # get block recarray list for later processing recarrays = [] parent_block = struct.parent_block if parent_block is not None: recarrays = parent_block.get_all_recarrays() recarray_len = len(recarrays) # loop until end of block line = " " optional_line_info = [] line_info_processed = False data_structs = struct.data_item_structures while line != "": line = file_handle.readline() arr_line = PyListUtil.split_data_line(line) if not line or ( arr_line and len(arr_line[0]) >= 2 and arr_line[0][:3].upper() == "END" ): # end of block if store_data: if store_internal: # store as rec array storage.store_internal( data_loaded, None, False, key=current_key ) storage.data_dimensions.unlock() return [False, line, data_line] else: data_rec = storage._build_recarray( data_loaded, current_key, True ) storage.data_dimensions.unlock() return data_rec if recarray_len != 1 and not MFComment.is_comment(arr_line, True): key = find_keyword(arr_line, struct.get_keywords()) if key is None: # unexpected text, may be start of another record if store_data: if store_internal: storage.store_internal( data_loaded, None, False, current_key ) storage.data_dimensions.unlock() return [True, line, data_line] else: data_rec = storage._build_recarray( data_loaded, current_key, True ) storage.data_dimensions.unlock() return data_rec self.simple_line = ( self.simple_line and not self.structure.parent_block.parent_package.advanced_package() ) if self.simple_line: line_len = len(self._last_line_info) if struct.num_optional > 0 and not line_info_processed: line_info_processed = True for index, data_item in enumerate( struct.data_item_structures ): if index < line_len: if data_item.optional: self._last_line_info = self._last_line_info[ :index ] line_len = len(self._last_line_info) optional_line_info.append(data_item) else: optional_line_info.append(data_item) if MFComment.is_comment(arr_line, True): arr_line.insert(0, "\n") storage.add_data_line_comment(arr_line, line_num) else: # do higher performance quick load self._data_line = () cellid_index = 0 cellid_tuple = () data_index = 0 for index, entry in enumerate(self._last_line_info): for sub_entry in entry: if sub_entry[1] is not None: if sub_entry[2] > 0: # is a cellid cellid_tuple += ( int(arr_line[sub_entry[0]]) - 1, ) # increment index cellid_index += 1 if cellid_index == sub_entry[2]: # end of current cellid self._data_line += (cellid_tuple,) cellid_index = 0 cellid_tuple = () else: # not a cellid self._data_line += ( convert_data( arr_line[sub_entry[0]], self._data_dimensions, sub_entry[1], data_structs[index], ), ) else: self._data_line += (None,) data_index = sub_entry[0] arr_line_len = len(arr_line) if arr_line_len > data_index + 1: # more data on the end of the line. see if it can # be loaded as optional data data_index += 1 for data_item in struct.data_item_structures[ len(self._last_line_info) : ]: if arr_line_len <= data_index: break if ( len(arr_line[data_index]) > 0 and arr_line[data_index][0] == "#" ): break elif data_item.name == "aux": ( data_index, self._data_line, ) = self._process_aux( storage, arr_line, arr_line_len, data_item, data_index, None, current_key, self._data_line, False, )[0:2] elif ( data_item.name == "boundname" and self._data_dimensions.package_dim.boundnames() ): self._data_line += ( convert_data( arr_line[data_index], self._data_dimensions, data_item.type, data_item, ), ) if arr_line_len > data_index + 1: # FEATURE: Keep number of white space characters used # in comments section storage.comments[line_num] = MFComment( " ".join(arr_line[data_index + 1 :]), struct.path, self._simulation_data, line_num, ) data_loaded.append(self._data_line) else: try: data_line = self.load_list_line( storage, arr_line, line_num, data_loaded, False, current_key=current_key, data_line=data_line, )[1] except Exception as ex: comment = ( "Unable to process line {} of data list: " '"{}"'.format(line_num + 1, line) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( struct.get_model(), struct.get_package(), struct.path, "loading data list from package file", struct.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ex, ) line_num += 1 if store_data: # store as rec array storage.store_internal(data_loaded, None, False, current_key) storage.data_dimensions.unlock() self._data_dimensions.package_dim.locked = False if not store_internal: return data_rec else: return [False, None, data_line]
[docs] def load_list_line( self, storage, arr_line, line_num, data_loaded, build_type_list, current_key, data_index_start=0, data_set=None, ignore_optional_vars=False, data_line=None, zero_based=False, ): data_item_ks = None struct = self.structure org_data_line = data_line # only initialize if we are at the start of a new line if data_index_start == 0: data_set = struct # new line of data data_line = () # determine if at end of block if arr_line and arr_line[0][:3].upper() == "END": self.enabled = True return 0, data_line data_index = data_index_start arr_line_len = len(arr_line) if MFComment.is_comment(arr_line, True) and data_index_start == 0: arr_line.insert(0, "\n") storage.add_data_line_comment(arr_line, line_num) else: # read variables var_index = 0 repeat_count = 0 data = "" for data_item_index, data_item in enumerate( data_set.data_item_structures ): if not data_item.optional or not ignore_optional_vars: if data_item.name == "aux": data_index, data_line = self._process_aux( storage, arr_line, arr_line_len, data_item, data_index, var_index, current_key, data_line, )[0:2] # optional mname data items are only specified if the # package is part of a model elif ( not data_item.optional or data_item.name[0:5] != "mname" or not storage.in_model ): if data_item.type == DatumType.keyword: data_index += 1 self.simple_line = False elif data_item.type == DatumType.record: # this is a record within a record, recurse into # _load_line to load it data_index, data_line = self.load_list_line( storage, arr_line, line_num, data_loaded, build_type_list, current_key, data_index, data_item, False, data_line=data_line, zero_based=zero_based, ) self.simple_line = False elif ( data_item.name != "boundname" or self._data_dimensions.package_dim.boundnames() ): if data_item.optional and data == "#": # comment mark found and expecting optional # data_item, we are done break if data_index >= arr_line_len: if data_item.optional: break else: unknown_repeats = ( storage.resolve_shape_list( data_item, repeat_count, current_key, data_line, )[1] ) if unknown_repeats: break break more_data_expected = True unknown_repeats = False repeat_count = 0 while more_data_expected or unknown_repeats: if data_index >= arr_line_len: if data_item.optional or unknown_repeats: break elif ( struct.num_optional >= len(data_set.data_item_structures) - data_item_index ): # there are enough optional variables # to account for the lack of data # reload line with all optional # variables ignored data_line = org_data_line return self.load_list_line( storage, arr_line, line_num, data_loaded, build_type_list, current_key, data_index_start, data_set, True, data_line=data_line, zero_based=zero_based, ) else: comment = ( "Not enough data provided " "for {}. Data for required " 'data item "{}" not ' "found".format( struct.name, data_item.name ) ) ( type_, value_, traceback_, ) = sys.exc_info() raise MFDataException( struct.get_model(), struct.get_package(), struct.path, "loading data list from " "package file", struct.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ) data = arr_line[data_index] repeat_count += 1 if data_item.type == DatumType.keystring: self.simple_line = False if repeat_count <= 1: # only process the # keyword on the first repeat find # data item associated with correct # keystring name_data = data.lower() if ( name_data not in data_item.keystring_dict ): name_data = f"{name_data}record" if ( name_data not in data_item.keystring_dict ): # look for data key in child records found = False for ( key, record, ) in data_item.keystring_dict.items(): if ( isinstance( record, MFDataStructure, ) and len( record.data_item_structures ) > 0 and record.data_item_structures[ 0 ].name == data.lower() ): name_data = key found = True break # data does not match any # expected keywords if not found: if ( self._simulation_data.verbosity_level.value >= VerbosityLevel.normal.value ): print( "WARNING: Failed to " "process line {}. " "Line does not match" " expected keystring" " {}".format( " ".join( arr_line ), data_item.name, ) ) break data_item_ks = ( data_item.keystring_dict[name_data] ) if data_item_ks == 0: comment = f"Could not find keystring {name_data}." ( type_, value_, traceback_, ) = sys.exc_info() raise MFDataException( struct.get_model(), struct.get_package(), struct.path, "loading data list from " "package file", struct.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ) # keyword is always implied in a # keystring and should be stored, # add a string data_item for the # keyword if data_item.name in self._temp_dict: # used cached data item for # performance keyword_data_item = ( self._temp_dict[data_item.name] ) else: keyword_data_item = deepcopy( data_item ) keyword_data_item.type = ( DatumType.string ) self._temp_dict[data_item.name] = ( keyword_data_item ) ( data_index, more_data_expected, data_line, unknown_repeats, ) = self._append_data_list( storage, keyword_data_item, arr_line, arr_line_len, data_index, var_index, repeat_count, current_key, data_line, ) if isinstance( data_item_ks, MFDataStructure ): dis = data_item_ks.data_item_structures for idx, ks_data_item in enumerate( dis ): if ( ks_data_item.type != DatumType.keyword and data_index < arr_line_len ): # data item contains additional # information ( data_index, more_data_expected, data_line, unknown_repeats, ) = self._append_data_list( storage, ks_data_item, arr_line, arr_line_len, data_index, var_index, repeat_count, current_key, data_line, zero_based=zero_based, ) elif ( idx > 0 and ks_data_item.type == DatumType.keyword ): data_index += 1 while data_index < arr_line_len: try: # append remaining data # (temporary fix) ( data_index, more_data_expected, data_line, unknown_repeats, ) = self._append_data_list( storage, ks_data_item, arr_line, arr_line_len, data_index, var_index, repeat_count, current_key, data_line, zero_based=zero_based, ) except MFDataException: break else: if ( data_item_ks.type != DatumType.keyword ): ( data_index, more_data_expected, data_line, unknown_repeats, ) = self._append_data_list( storage, data_item_ks, arr_line, arr_line_len, data_index, var_index, repeat_count, current_key, data_line, zero_based=zero_based, ) else: # append empty data as a placeholder. # this is necessarily to keep the # recarray a consistent shape data_line = data_line + (None,) data_index += 1 else: if data_item.tagged and repeat_count == 1: # data item tagged, include data item # name as a keyword di_type = data_item.type data_item.type = DatumType.keyword ( data_index, more_data_expected, data_line, unknown_repeats, ) = self._append_data_list( storage, data_item, arr_line, arr_line_len, data_index, var_index, repeat_count, current_key, data_line, zero_based=zero_based, ) data_item.type = di_type ( data_index, more_data_expected, data_line, unknown_repeats, ) = self._append_data_list( storage, data_item, arr_line, arr_line_len, data_index, var_index, repeat_count, current_key, data_line, zero_based=zero_based, ) if more_data_expected is None: # indeterminate amount of data expected. # keep reading data until eoln more_data_expected = ( data_index < arr_line_len ) self.simple_line = ( self.simple_line and not unknown_repeats and ( len(data_item.shape) == 0 or data_item.is_cellid ) ) var_index += 1 # populate unused optional variables with None type for data_item in data_set.data_item_structures[var_index:]: if data_item.name == "aux": data_line = self._process_aux( storage, arr_line, arr_line_len, data_item, data_index, var_index, current_key, data_line, )[1] elif ( data_item.name != "boundname" or self._data_dimensions.package_dim.boundnames() ): ( data_index, more_data_expected, data_line, unknown_repeats, ) = self._append_data_list( storage, data_item, None, 0, data_index, var_index, 1, current_key, data_line, zero_based=zero_based, ) # only do final processing on outer-most record if data_index_start == 0: # if more pieces exist if arr_line_len > data_index + 1: # FEATURE: Keep number of white space characters used in # comments section storage.comments[line_num] = MFComment( " ".join(arr_line[data_index + 1 :]), struct.path, self._simulation_data, line_num, ) data_loaded.append(data_line) return data_index, data_line
def _process_aux( self, storage, arr_line, arr_line_len, data_item, data_index, var_index, current_key, data_line, add_to_last_line=True, ): aux_var_names = self._data_dimensions.package_dim.get_aux_variables() more_data_expected = False if aux_var_names is not None: for var_name in aux_var_names[0]: if var_name.lower() != "auxiliary": if data_index >= arr_line_len: # store placeholder None ( data_index, more_data_expected, data_line, ) = self._append_data_list( storage, data_item, None, 0, data_index, var_index, 1, current_key, data_line, add_to_last_line, )[0:3] else: # read in aux variables ( data_index, more_data_expected, data_line, ) = self._append_data_list( storage, data_item, arr_line, arr_line_len, data_index, var_index, 0, current_key, data_line, add_to_last_line, )[0:3] return data_index, data_line, more_data_expected def _append_data_list( self, storage, data_item, arr_line, arr_line_len, data_index, var_index, repeat_count, current_key, data_line, add_to_last_line=True, zero_based=False, ): if zero_based: sub_amt = 0 else: sub_amt = 1 # append to a 2-D list which will later be converted to a numpy # rec array struct = self.structure if add_to_last_line: self._last_line_info.append([]) if data_item.is_cellid or ( data_item.possible_cellid and storage._validate_cellid(arr_line, data_index, data_item) ): if self._data_dimensions is None: comment = ( "CellID field specified in for data " '"{}" field "{}" which does not contain a model ' "grid. This could be due to a problem with " "the flopy definition files. Please get the " "latest flopy definition files" ".".format(struct.name, data_item.name) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( struct.get_model(), struct.get_package(), struct.path, "loading data list from package file", struct.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ) # in case of multiple model grids, determine which one to use model_num = DatumUtil.cellid_model_num( data_item.name, struct.model_data, self._data_dimensions.package_dim.model_dim, ) model_grid = self._data_dimensions.get_model_grid( model_num=model_num ) # read in the entire cellid cellid_size = model_grid.get_num_spatial_coordinates() cellid_tuple = () if ( not DatumUtil.is_int(arr_line[data_index]) and arr_line[data_index].lower() == "none" ): # special case where cellid is 'none', store as 'none' cellid_tuple = "none" if add_to_last_line: self._last_line_info[-1].append( [data_index, data_item.type, cellid_size] ) new_index = data_index + 1 else: # handle regular cellid if cellid_size + data_index > arr_line_len: comment = ( "Not enough data found when reading cell ID " 'in data "{}" field "{}". Expected {} items ' "and found {} items" ".".format( struct.name, data_item.name, cellid_size, arr_line_len - data_index, ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( struct.get_model(), struct.get_package(), struct.path, "loading data list from package file", struct.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ) for index in range(data_index, cellid_size + data_index): if ( not DatumUtil.is_int(arr_line[index]) or int(arr_line[index]) < 0 ): comment = ( "Expected a integer or cell ID in " 'data "{}" field "{}". Found {} ' 'in line "{}"' ". ".format( struct.name, data_item.name, arr_line[index], arr_line, ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( struct.get_model(), struct.get_package(), struct.path, "loading data list from package file", struct.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ) data_converted = convert_data( arr_line[index], self._data_dimensions, data_item.type ) cellid_tuple = cellid_tuple + ( int(data_converted) - sub_amt, ) if add_to_last_line: self._last_line_info[-1].append( [index, data_item.type, cellid_size] ) new_index = data_index + cellid_size data_line = data_line + (cellid_tuple,) if ( data_item.shape is not None and len(data_item.shape) > 0 and data_item.shape[0] == "ncelldim" ): # shape is the coordinate shape, which has already been read more_data_expected = False unknown_repeats = False else: ( more_data_expected, unknown_repeats, ) = storage.resolve_shape_list( data_item, repeat_count, current_key, data_line ) return new_index, more_data_expected, data_line, unknown_repeats else: if arr_line is None: data_converted = None if add_to_last_line: self._last_line_info[-1].append( [data_index, data_item.type, 0] ) else: if ( arr_line[data_index].lower() in self._data_dimensions.package_dim.get_tsnames() ): # references a time series, store as is data_converted = arr_line[data_index].lower() # override recarray data type to support writing # string values storage.override_data_type(var_index, object) if add_to_last_line: self._last_line_info[-1].append( [data_index, DatumType.string, 0] ) else: data_converted = convert_data( arr_line[data_index], self._data_dimensions, data_item.type, data_item, sub_amt=sub_amt, ) if ( data_item.indicates_file_name() or data_item.file_nam_in_nam_file() ): data_converted = datautil.clean_filename( data_converted ) if add_to_last_line: self._last_line_info[-1].append( [data_index, data_item.type, 0] ) data_line = data_line + (data_converted,) more_data_expected, unknown_repeats = storage.resolve_shape_list( data_item, repeat_count, current_key, data_line ) return ( data_index + 1, more_data_expected, data_line, unknown_repeats, )
[docs]class MFFileAccessScalar(MFFileAccess): def __init__( self, structure, data_dimensions, simulation_data, path, current_key ): super().__init__( structure, data_dimensions, simulation_data, path, current_key )
[docs] def load_from_package( self, first_line, file_handle, storage, data_type, keyword, pre_data_comments=None, ): # read in any pre data comments current_line = self._read_pre_data_comments( first_line, file_handle, pre_data_comments, storage ) datautil.PyListUtil.reset_delimiter_used() arr_line = datautil.PyListUtil.split_data_line(current_line) # verify keyword index_num = self._load_keyword(arr_line, 0, keyword)[0] # store data datatype = self.structure.get_datatype() if self.structure.type == DatumType.record: index = 0 for data_item_type in self.structure.get_data_item_types(): optional = self.structure.data_item_structures[index].optional if ( len(arr_line) <= index + 1 or data_item_type[0] != DatumType.keyword or (index > 0 and optional is True) ): break index += 1 first_type = self.structure.get_data_item_types()[0] if first_type[0] == DatumType.keyword: converted_data = [True] else: converted_data = [] if first_type[0] != DatumType.keyword or index == 1: if ( self.structure.get_data_item_types()[1] != DatumType.keyword or arr_line[index].lower == self.structure.data_item_structures[index].name ): try: converted_data.append( convert_data( arr_line[index], self._data_dimensions, self.structure.data_item_structures[ index ].type, self.structure.data_item_structures[0], ) ) except Exception as ex: message = ( 'Could not convert "{}" of type "{}" ' "to a string.".format( arr_line[index], self.structure.data_item_structures[ index ].type, ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "converting data to string", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ex, ) try: storage.set_data(converted_data, key=self._current_key) index_num += 1 except Exception as ex: message = 'Could not set data "{}" with key "{}".'.format( converted_data, self._current_key ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "setting data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ex, ) elif ( datatype == DataType.scalar_keyword or datatype == DataType.scalar_keyword_transient ): # store as true try: storage.set_data(True, key=self._current_key) except Exception as ex: message = f'Could not set data "True" with key "{self._current_key}".' type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "setting data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ex, ) else: data_item_struct = self.structure.data_item_structures[0] if len(arr_line) < 1 + index_num: message = ( 'Error reading variable "{}". Expected data ' 'after label "{}" not found at line ' '"{}".'.format( self.structure.name, data_item_struct.name.lower(), current_line, ) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "loading data from file", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) try: converted_data = convert_data( arr_line[index_num], self._data_dimensions, data_type, data_item_struct, ) except Exception as ex: message = ( 'Could not convert "{}" of type "{}" ' "to a string.".format(arr_line[index_num], data_type) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "converting data to string", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ex, ) try: # read next word as data storage.set_data(converted_data, key=self._current_key) except Exception as ex: message = 'Could not set data "{}" with key "{}".'.format( converted_data, self._current_key ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "setting data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ex, ) index_num += 1 if len(arr_line) > index_num: # save remainder of line as comment storage.add_data_line_comment(arr_line[index_num:], 0) return [False, None]