Source code for flopy.mf6.data.mfdataplist

import copy
import inspect
import io
import os
import sys

import numpy as np
import pandas

from ...datbase import DataListInterface, DataType
from ...discretization.structuredgrid import StructuredGrid
from ...discretization.unstructuredgrid import UnstructuredGrid
from ...discretization.vertexgrid import VertexGrid
from ...utils import datautil
from ..data import mfdata
from ..mfbase import ExtFileAction, MFDataException, VerbosityLevel
from ..utils.mfenums import DiscretizationType
from .mfdatalist import MFList
from .mfdatastorage import DataStorageType, DataStructureType
from .mfdatautil import list_to_array, process_open_close_line
from .mffileaccess import MFFileAccessList
from .mfstructure import DatumType, MFDataStructure


[docs]class PandasListStorage: """ Contains data storage information for a single list. Attributes ---------- internal_data : ndarray or recarray data being stored, if full data is being stored internally in memory data_storage_type : DataStorageType method used to store the data fname : str file name of external file containing the data factor : int/float factor to multiply the data by iprn : int print code binary : bool whether the data is stored in a binary file modified : bool whether data in storage has been modified since last write Methods ------- get_record : dict returns a dictionary with all data storage information set_record(rec) sets data storage information based on the the dictionary "rec" set_internal(internal_data) make data storage internal, using "internal_data" as the data set_external(fname, data) make data storage external, with file "fname" and external data "data" internal_size : int size of the internal data has_data : bool whether or not data exists """ def __init__(self): self.internal_data = None self.fname = None self.iprn = None self.binary = False self.data_storage_type = None self.modified = False def __repr__(self): return self.get_data_str(True) def __str__(self): return self.get_data_str(False) def _get_header_str(self): header_list = [] if self.data_storage_type == DataStorageType.external_file: header_list.append(f"open/close {self.fname}") else: header_list.append("internal") if self.iprn is not None: header_list.append(f"iprn {self.iprn}") if len(header_list) > 0: return ", ".join(header_list) else: return ""
[docs] def get_data_str(self, formal): data_str = "" layer_str = "" if self.data_storage_type == DataStorageType.internal_array: if self.internal_data is not None: header = self._get_header_str() if formal: data_str = "{}{}{{{}}}\n({})\n".format( data_str, layer_str, header, repr(self.internal_data), ) else: data_str = "{}{}{{{}}}\n({})\n".format( data_str, layer_str, header, str(self.internal_data), ) elif self.data_storage_type == DataStorageType.external_file: header = self._get_header_str() data_str = "{}{}{{{}}}\n({})\n".format( data_str, layer_str, header, "External data not displayed", ) return data_str
[docs] def get_record(self): rec = {} if self.internal_data is not None: rec["data"] = copy.deepcopy(self.internal_data) if self.fname is not None: rec["filename"] = self.fname if self.iprn is not None: rec["iprn"] = self.iprn if self.binary is not None: rec["binary"] = self.binary return rec
[docs] def set_record(self, rec): if "data" in rec: self.internal_data = rec["data"] if "filename" in rec: self.fname = rec["filename"] if "iprn" in rec: self.iprn = rec["iprn"] if "binary" in rec: self.binary = rec["binary"]
[docs] def set_internal(self, internal_data): self.data_storage_type = DataStorageType.internal_array self.internal_data = internal_data self.fname = None self.binary = False
[docs] def set_external(self, fname, data=None): self.data_storage_type = DataStorageType.external_file self.internal_data = data self.fname = fname
@property def internal_size(self): if not isinstance(self.internal_data, pandas.DataFrame): return 0 else: return len(self.internal_data)
[docs] def has_data(self): if self.data_storage_type == DataStorageType.internal_array: return self.internal_data is not None else: return self.fname is not None
[docs]class MFPandasList(mfdata.MFMultiDimVar, DataListInterface): """ Provides an interface for the user to access and update MODFLOW list data using Pandas. MFPandasList objects are not designed to be directly constructed by the end user. When a flopy for MODFLOW 6 package object is constructed, the appropriate MFList objects are automatically built. Parameters ---------- sim_data : MFSimulationData data contained in the simulation model_or_sim : MFSimulation or MFModel parent model, or if not part of a model, parent simulation structure : MFDataStructure describes the structure of the data data : list or ndarray or None actual data enable : bool enable/disable the array path : tuple path in the data dictionary to this MFArray dimensions : MFDataDimensions dimension information related to the model, package, and array package : MFPackage parent package block : MFBlock parnet block """ def __init__( self, sim_data, model_or_sim, structure, data=None, enable=None, path=None, dimensions=None, package=None, block=None, ): super().__init__( sim_data, model_or_sim, structure, enable, path, dimensions ) self._data_storage = self._new_storage() self._package = package self._block = block self._last_line_info = [] self._data_line = None self._temp_dict = {} self._crnt_line_num = 1 self._data_header = None self._header_names = None self._data_types = None self._data_item_names = None self._mg = None self._current_key = 0 self._max_file_size = 1000000000000000 if self._model_or_sim.type == "Model": self._mg = self._model_or_sim.modelgrid if data is not None: try: self.set_data(data, True) except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( structure.get_model(), structure.get_package(), path, "setting data", structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, sim_data.debug, ex, ) @property def data_type(self): """Type of data (DataType) stored in the list""" return DataType.list @property def package(self): """Package object that this data belongs to.""" return self._package @property def dtype(self): """Type of data (numpy.dtype) stored in the list""" return self.get_dataframe().dtype def _append_type_list(self, data_name, data_type, include_header=False): if include_header: self._data_header[data_name] = data_type self._header_names.append(data_name) self._data_types.append(data_type) def _process_open_close_line(self, arr_line, store=True): """ Process open/close line extracting the multiplier, print format, binary flag, data file path, and any comments """ data_dim = self.data_dimensions ( multiplier, print_format, binary, data_file, data, comment, ) = process_open_close_line( arr_line, data_dim, self._data_type, self._simulation_data.debug, store, ) # add to active list of external files model_name = data_dim.package_dim.model_dim[0].model_name self._simulation_data.mfpath.add_ext_file(data_file, model_name) return data, multiplier, print_format, binary, data_file def _add_cellid_fields(self, data, keep_existing=False): """ Add cellid fields to a Pandas DataFrame and drop the layer, row, column, cell, node, fields that the cellid is based on """ for data_item in self.structure.data_item_structures: if data_item.type == DatumType.integer: if data_item.name.lower() == "cellid": columns = data.columns.tolist() if isinstance(self._mg, StructuredGrid): if ( "layer" in columns and "row" in columns and "column" in columns ): data["cellid"] = data[ ["layer", "row", "column"] ].apply(tuple, axis=1) if not keep_existing: data = data.drop( columns=["layer", "row", "column"] ) elif isinstance(self._mg, VertexGrid): cell_2 = None if "cell" in columns: cell_2 = "cell" elif "ncpl" in columns: cell_2 = "ncpl" if cell_2 is not None and "layer" in columns: data["cellid"] = data[["layer", cell_2]].apply( tuple, axis=1 ) if not keep_existing: data = data.drop(columns=["layer", cell_2]) elif isinstance(self._mg, UnstructuredGrid): if "node" in columns: data["cellid"] = data[["node"]].apply( tuple, axis=1 ) if not keep_existing: data = data.drop(columns=["node"]) else: raise MFDataException( "ERROR: Unrecognized model grid " "{str(self._mg)} not supported by MFBasicList" ) # reorder columns column_headers = data.columns.tolist() column_headers.insert(0, column_headers.pop()) data = data[column_headers] return data def _remove_cellid_fields(self, data): """remove cellid fields from data""" for data_item in self.structure.data_item_structures: if data_item.type == DatumType.integer: if data_item.name.lower() == "cellid": # if there is a cellid field, remove it if "cellid" in data.columns: return data.drop("cellid", axis=1) return data def _get_cellid_size(self, data_item_name): """get the number of spatial coordinates used in the cellid""" model_num = datautil.DatumUtil.cellid_model_num( data_item_name, self.data_dimensions.structure.model_data, self.data_dimensions.package_dim.model_dim, ) model_grid = self.data_dimensions.get_model_grid(model_num=model_num) return model_grid.get_num_spatial_coordinates() def _build_data_header(self): """ Constructs lists of data column header names and data column types based on data structure information, boundname and aux information, and model discretization type. """ # initialize self._data_header = {} self._header_names = [] self._data_types = [] self._data_item_names = [] s_type = pandas.StringDtype f_type = np.float64 i_type = np.int64 data_dim = self.data_dimensions # loop through data structure definition information for data_item, index in zip( self.structure.data_item_structures, range(0, len(self.structure.data_item_structures)), ): if data_item.name.lower() == "aux": # get all of the aux variables for this dataset aux_var_names = data_dim.package_dim.get_aux_variables() if aux_var_names is not None: for aux_var_name in aux_var_names[0]: if aux_var_name.lower() != "auxiliary": self._append_type_list(aux_var_name, f_type) self._data_item_names.append(aux_var_name) elif data_item.name.lower() == "boundname": # see if boundnames is enabled for this dataset if data_dim.package_dim.boundnames(): self._append_type_list("boundname", s_type) self._data_item_names.append("boundname") else: if data_item.type == DatumType.keyword: self._append_type_list(data_item.name, s_type) elif data_item.type == DatumType.string: self._append_type_list(data_item.name, s_type) elif data_item.type == DatumType.integer: if data_item.name.lower() == "cellid": # get the appropriate cellid column headings for the # model's discretization type if isinstance(self._mg, StructuredGrid): self._append_type_list("layer", i_type, True) self._append_type_list("row", i_type, True) self._append_type_list("column", i_type, True) elif isinstance(self._mg, VertexGrid): self._append_type_list("layer", i_type, True) self._append_type_list("cell", i_type, True) elif isinstance(self._mg, UnstructuredGrid): self._append_type_list("node", i_type, True) else: raise MFDataException( "ERROR: Unrecognized model grid " "{str(self._mg)} not supported by MFBasicList" ) else: self._append_type_list(data_item.name, i_type) elif data_item.type == DatumType.double_precision: self._append_type_list(data_item.name, f_type) else: self._data_header = None self._header_names = None self._data_item_names.append(data_item.name) @staticmethod def _unique_column_name(data, col_base_name): """generate a unique column name based on "col_base_name" """ col_name = col_base_name idx = 2 while col_name in data: col_name = f"{col_base_name}_{idx}" idx += 1 return col_name @staticmethod def _untuple_manually(pdata, loc, new_column_name, column_name, index): """ Loop through pandas DataFrame removing tuples from cellid columns. Used when pandas "insert" method to perform the same task fails. """ # build new column list new_column = [] for idx, row in pdata.iterrows(): if isinstance(row[column_name], tuple) or isinstance( row[column_name], list ): new_column.append(row[column_name][index]) else: new_column.append(row[column_name]) # insert list as new column pdata.insert( loc=loc, column=new_column_name, value=new_column, ) def _untuple_cellids(self, pdata): """ For all cellids in "pdata", convert them to layer, row, column fields and and then drop the cellids from "pdata". Returns the updated "pdata". """ if pdata is None or len(pdata) == 0: return pdata, 0 fields_to_correct = [] data_idx = 0 # find cellid columns that need to be fixed columns = pdata.columns for data_item in self.structure.data_item_structures: if data_idx >= len(columns) + 1: break if ( data_item.type == DatumType.integer and data_item.name.lower() == "cellid" ): if isinstance(pdata.iloc[0, data_idx], tuple): fields_to_correct.append((data_idx, columns[data_idx])) data_idx += 1 else: data_idx += self._get_cellid_size(data_item.name) else: data_idx += 1 # fix columns for field_idx, column_name in fields_to_correct: # add individual layer/row/column/cell/node columns if isinstance(self._mg, StructuredGrid): try: pdata.insert( loc=field_idx, column=self._unique_column_name(pdata, "layer"), value=pdata.apply(lambda x: x[column_name][0], axis=1), ) except (ValueError, TypeError): self._untuple_manually( pdata, field_idx, self._unique_column_name(pdata, "layer"), column_name, 0, ) try: pdata.insert( loc=field_idx + 1, column=self._unique_column_name(pdata, "row"), value=pdata.apply(lambda x: x[column_name][1], axis=1), ) except (ValueError, TypeError): self._untuple_manually( pdata, field_idx + 1, self._unique_column_name(pdata, "row"), column_name, 1, ) try: pdata.insert( loc=field_idx + 2, column=self._unique_column_name(pdata, "column"), value=pdata.apply(lambda x: x[column_name][2], axis=1), ) except (ValueError, TypeError): self._untuple_manually( pdata, field_idx + 2, self._unique_column_name(pdata, "column"), column_name, 2, ) elif isinstance(self._mg, VertexGrid): try: pdata.insert( loc=field_idx, column=self._unique_column_name(pdata, "layer"), value=pdata.apply(lambda x: x[column_name][0], axis=1), ) except (ValueError, TypeError): self._untuple_manually( pdata, field_idx, self._unique_column_name(pdata, "layer"), column_name, 0, ) try: pdata.insert( loc=field_idx + 1, column=self._unique_column_name(pdata, "cell"), value=pdata.apply(lambda x: x[column_name][1], axis=1), ) except (ValueError, TypeError): self._untuple_manually( pdata, field_idx + 1, self._unique_column_name(pdata, "cell"), column_name, 1, ) elif isinstance(self._mg, UnstructuredGrid): if column_name == "node": # fixing a problem where node was specified as a tuple # make sure new column is named properly column_name = "node_2" pdata = pdata.rename(columns={"node": column_name}) try: pdata.insert( loc=field_idx, column=self._unique_column_name(pdata, "node"), value=pdata.apply(lambda x: x[column_name][0], axis=1), ) except (ValueError, TypeError): self._untuple_manually( pdata, field_idx, self._unique_column_name(pdata, "node"), column_name, 0, ) # remove cellid tuple pdata = pdata.drop(column_name, axis=1) return pdata, len(fields_to_correct) def _resolve_columns(self, data): """resolve the column headings for a specific dataset provided""" if len(data) == 0: return self._header_names, False if len(data[0]) == len(self._header_names) or len(data[0]) == 0: return self._header_names, False if len(data[0]) == len(self._data_item_names): return self._data_item_names, True if ( len(data[0]) == len(self._header_names) - 1 and self._header_names[-1] == "boundname" ): return self._header_names[:-1], True if ( len(data[0]) == len(self._data_item_names) - 1 and self._data_item_names[-1] == "boundname" ): return self._data_item_names[:-1], True return None, None def _untuple_recarray(self, rec): rec_list = rec.tolist() for row, line in enumerate(rec_list): for column, data in enumerate(line): if isinstance(data, tuple) and len(data) == 1: line_lst = list(line) line_lst[column] = data[0] rec_list[row] = tuple(line_lst) return rec_list
[docs] def set_data(self, data, autofill=False, check_data=True, append=False): """Sets the contents of the data to "data". Data can have the following formats: 1) recarray - recarray containing the datalist 2) [(line_one), (line_two), ...] - list where each line of the datalist is a tuple within the list If the data is transient, a dictionary can be used to specify each stress period where the dictionary key is <stress period> - 1 and the dictionary value is the datalist data defined above: {0:ndarray, 1:[(line_one), (line_two), ...], 2:{'filename':filename}) Parameters ---------- data : ndarray/list/dict Data to set autofill : bool Automatically correct data check_data : bool Whether to verify the data append : bool Append to existing data """ # (re)build data header self._build_data_header() if isinstance(data, dict) and not self.has_data(): MFPandasList.set_record(self, data) return if isinstance(data, np.recarray): # verify data shape of data (recarray) if len(data) == 0: # create empty dataset data = pandas.DataFrame(columns=self._header_names) elif len(data[0]) != len(self._header_names): if len(data[0]) == len(self._data_item_names): # data most likely being stored with cellids as tuples, # create a dataframe and untuple the cellids data = pandas.DataFrame( data, columns=self._data_item_names ) data = self._untuple_cellids(data)[0] # make sure columns are still in correct order data = pandas.DataFrame(data, columns=self._header_names) else: raise MFDataException( f"ERROR: Data list {self._data_name} supplied the " f"wrong number of columns of data, expected " f"{len(self._data_item_names)} got {len(data[0])}." ) else: # data size matches the expected header names, create a pandas # dataframe from the data data_new = pandas.DataFrame(data, columns=self._header_names) if not self._dataframe_check(data_new): data_list = self._untuple_recarray(data) data = pandas.DataFrame( data_list, columns=self._header_names ) else: data, count = self._untuple_cellids(data_new) if count > 0: # make sure columns are still in correct order data = pandas.DataFrame( data, columns=self._header_names ) elif isinstance(data, list) or isinstance(data, tuple): if not (isinstance(data[0], list) or isinstance(data[0], tuple)): # get data in the format of a tuple of lists (or tuples) data = [data] # resolve the data's column headings columns = self._resolve_columns(data)[0] if columns is None: message = ( f"ERROR: Data list {self._data_name} supplied the " f"wrong number of columns of data, expected " f"{len(self._data_item_names)} got {len(data[0])}." ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "setting list data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) if len(data[0]) == 0: # create empty dataset data = pandas.DataFrame(columns=columns) else: # create dataset data = pandas.DataFrame(data, columns=columns) if ( self._data_item_names[-1] == "boundname" and "boundname" not in columns ): # add empty boundname column data["boundname"] = "" # get rid of tuples from cellids data, count = self._untuple_cellids(data) if count > 0: # make sure columns are still in correct order data = pandas.DataFrame(data, columns=self._header_names) elif isinstance(data, pandas.DataFrame): if len(data.columns) != len(self._header_names): message = ( f"ERROR: Data list {self._data_name} supplied the " f"wrong number of columns of data, expected " f"{len(self._data_item_names)} got {len(data[0])}.\n" f"Data columns supplied: {data.columns}\n" f"Data columns expected: {self._header_names}" ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "setting list data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) # set correct data header names data = data.set_axis(self._header_names, axis=1) else: message = ( f"ERROR: Data list {self._data_name} is an unsupported type: " f"{type(data)}." ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "setting list data", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) data_storage = self._get_storage_obj() if append: # append data to existing dataframe current_data = self._get_dataframe() if current_data is not None: data = pandas.concat([current_data, data]) if data_storage.data_storage_type == DataStorageType.external_file: # store external data until next write data_storage.internal_data = data else: # store data internally data_storage.set_internal(data) data_storage.modified = True
[docs] def has_modified_ext_data(self): """check to see if external data has been modified since last read""" data_storage = self._get_storage_obj() return ( data_storage.data_storage_type == DataStorageType.external_file and data_storage.internal_data is not None )
[docs] def binary_ext_data(self): """check for binary data""" data_storage = self._get_storage_obj() return data_storage.binary
[docs] def to_array(self, kper=0, mask=False): """Convert stress period boundary condition (MFDataList) data for a specified stress period to a 3-D numpy array. Parameters ---------- kper : int MODFLOW zero-based stress period number to return (default is zero) mask : bool return array with np.nan instead of zero Returns ---------- out : dict of numpy.ndarrays Dictionary of 3-D numpy arrays containing the stress period data for a selected stress period. The dictionary keys are the MFDataList dtype names for the stress period data.""" sarr = self.get_data(key=kper) model_grid = self.data_dimensions.get_model_grid() return list_to_array(sarr, model_grid, kper, mask)
[docs] def set_record(self, record, autofill=False, check_data=True): """Sets the contents of the data and metadata to "data_record". Data_record is a dictionary with has the following format: {'filename':filename, 'binary':True/False, 'data'=data} To store to file include 'filename' in the dictionary. Parameters ---------- record : ndarray/list/dict Data and metadata to set autofill : bool Automatically correct data check_data : bool Whether to verify the data """ if isinstance(record, dict): data_storage = self._get_storage_obj() if "filename" in record: data_storage.set_external(record["filename"]) if "binary" in record: if ( record["binary"] and self.data_dimensions.package_dim.boundnames() ): message = ( "Unable to store list data ({}) to a binary " "file when using boundnames" ".".format(self.data_dimensions.structure.name) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.data_dimensions.structure.get_model(), self.data_dimensions.structure.get_package(), self.data_dimensions.structure.path, "writing list data to binary file", self.data_dimensions.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ) data_storage.binary = record["binary"] if "data" in record: # data gets written out to file MFPandasList.set_data(self, record["data"]) # get file path fd_file_path = self._get_file_path() # make sure folder exists folder_path = os.path.split(fd_file_path)[0] if not os.path.exists(folder_path): os.makedirs(folder_path) # store data self._write_file_entry(fd_file_path) else: if "data" in record: data_storage.modified = True data_storage.set_internal(None) MFPandasList.set_data(self, record["data"]) if "iprn" in record: data_storage.iprn = record["iprn"]
[docs] def append_data(self, data): """Appends "data" to the end of this list. Assumes data is in a format that can be appended directly to a pandas dataframe. Parameters ---------- data : list(tuple) Data to append. """ try: self._resync() if self._get_storage_obj() is None: self._data_storage = self._new_storage() data_storage = self._get_storage_obj() if ( data_storage.data_storage_type == DataStorageType.internal_array ): # update internal data MFPandasList.set_data(self, data, append=True) elif ( data_storage.data_storage_type == DataStorageType.external_file ): # get external data from file external_data = self._get_dataframe() if isinstance(data, list): # build dataframe data = pandas.DataFrame( data, columns=external_data.columns ) # concatenate data = pandas.concat([external_data, data]) # store ext_record = self._get_record() ext_record["data"] = data MFPandasList.set_record(self, ext_record) except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "appending data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, )
[docs] def append_list_as_record(self, record): """Appends the list `record` as a single record in this list's dataframe. Assumes "data" has the correct dimensions. Parameters ---------- record : list List to be appended as a single record to the data's existing recarray. """ self._resync() try: # store self.append_data([record]) except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "appending data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, )
[docs] def update_record(self, record, key_index): """Updates a record at index "key_index" with the contents of "record". If the index does not exist update_record appends the contents of "record" to this list's recarray. Parameters ---------- record : list New record to update data with key_index : int Stress period key of record to update. Only used in transient data types. """ self.append_list_as_record(record)
[docs] def store_internal( self, check_data=True, ): """Store all data internally. Parameters ---------- check_data : bool Verify data prior to storing """ storage = self._get_storage_obj() # check if data is already stored external if ( storage is None or storage.data_storage_type == DataStorageType.external_file ): data = self._get_dataframe() # if not empty dataset if data is not None: if ( self._simulation_data.verbosity_level.value >= VerbosityLevel.verbose.value ): print(f"Storing {self.structure.name} internally...") internal_data = { "data": data, } MFPandasList.set_record( self, internal_data, check_data=check_data )
[docs] def store_as_external_file( self, external_file_path, binary=False, replace_existing_external=True, check_data=True, ): """Store all data externally in file external_file_path. the binary allows storage in a binary file. If replace_existing_external is set to False, this method will not do anything if the data is already in an external file. Parameters ---------- external_file_path : str Path to external file binary : bool Store data in a binary file replace_existing_external : bool Whether to replace an existing external file. check_data : bool Verify data prior to storing """ # only store data externally (do not subpackage info) if self.structure.construct_package is None: storage = self._get_storage_obj() # check if data is already stored external if ( replace_existing_external or storage is None or storage.data_storage_type == DataStorageType.internal_array or storage.data_storage_type == DataStorageType.internal_constant ): data = self._get_dataframe() # if not empty dataset if data is not None: if ( self._simulation_data.verbosity_level.value >= VerbosityLevel.verbose.value ): print( "Storing {} to external file {}.." ".".format( self.structure.name, external_file_path ) ) external_data = { "filename": external_file_path, "data": data, "binary": binary, } MFPandasList.set_record( self, external_data, check_data=check_data )
[docs] def external_file_name(self): """Returns external file name, or None if this is not external data.""" storage = self._get_storage_obj() if storage is None: return None if ( storage.data_storage_type == DataStorageType.external_file and storage.fname is not None and storage.fname != "" ): return storage.fname return None
@staticmethod def _file_data_to_memory(fd_data_file, first_line): """ scan data file from starting point to find the extent of the data Parameters ---------- fd_data_file : file descriptor File with data to scan. File location should be at the beginning of the data. Returns ------- list, str : data from file, next line in file after data """ data_lines = [] clean_first_line = first_line.strip().lower() if clean_first_line.startswith("end"): return data_lines, fd_data_file.readline() if len(clean_first_line) > 0 and clean_first_line[0] != "#": data_lines.append(clean_first_line) line = fd_data_file.readline() while line: line_mod = line.strip().lower() if line_mod.startswith("end"): return data_lines, line if len(line_mod) > 0 and line_mod[0] != "#": data_lines.append(line_mod) line = fd_data_file.readline() return data_lines, "" def _dataframe_check(self, data_frame): valid = data_frame.shape[0] > 0 if valid: for name in self._header_names: if ( name != "boundname" and data_frame[name].isnull().values.any() ): valid = False break return valid def _try_pandas_read(self, fd_data_file): delimiter_list = ["\\s+", ","] for delimiter in delimiter_list: try: # read flopy formatted data, entire file data_frame = pandas.read_csv( fd_data_file, sep=delimiter, names=self._header_names, dtype=self._data_header, comment="#", index_col=False, skipinitialspace=True, ) except BaseException: fd_data_file.seek(0) continue # basic check for valid dataset if self._dataframe_check(data_frame): return data_frame else: fd_data_file.seek(0) return None def _read_text_data(self, fd_data_file, first_line, external_file=False): """ read list data from data file Parameters ---------- fd_data_file : file descriptor File with data. File location should be at the beginning of the data. external_file : bool whether this is an external file Returns ------- DataFrame : file's list data list : containing boolean for success of operation and the next line of data in the file """ # initialize data_frame = None return_val = [False, None] # build header self._build_data_header() file_data, next_line = self._file_data_to_memory( fd_data_file, first_line ) io_file_data = io.StringIO("\n".join(file_data)) if external_file: data_frame = self._try_pandas_read(io_file_data) if data_frame is not None: self._decrement_id_fields(data_frame) else: # get number of rows of data if len(file_data) > 0: data_frame = self._try_pandas_read(io_file_data) if data_frame is not None: self._decrement_id_fields(data_frame) return_val = [True, fd_data_file.readline()] if data_frame is None: # read user formatted data using MFList class list_data = MFList( self._simulation_data, self._model_or_sim, self.structure, None, True, self.path, self.data_dimensions.package_dim, self._package, self._block, ) # start in original location io_file_data.seek(0) return_val = list_data.load( None, io_file_data, self._block.block_headers[-1] ) rec_array = list_data.get_data() if rec_array is not None: data_frame = pandas.DataFrame(rec_array) data_frame = self._untuple_cellids(data_frame)[0] return_val = [True, fd_data_file.readline()] else: data_frame = None return data_frame, return_val def _save_binary_data(self, fd_data_file, data): # write file_access = MFFileAccessList( self.structure, self.data_dimensions, self._simulation_data, self._path, self._current_key, ) file_access.write_binary_file( self._dataframe_to_recarray(data), fd_data_file, self._model_or_sim.modeldiscrit, ) data_storage = self._get_storage_obj() data_storage.internal_data = None
[docs] def has_data(self, key=None): """Returns whether this MFList has any data associated with it.""" try: if self._get_storage_obj() is None: return False return self._get_storage_obj().has_data() except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "checking for data", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, )
def _load_external_data(self, data_storage): """loads external data into a panda's dataframe""" file_path = self._resolve_ext_file_path(data_storage) # parse next line in file as data header if data_storage.binary: file_access = MFFileAccessList( self.structure, self.data_dimensions, self._simulation_data, self._path, self._current_key, ) np_data = file_access.read_binary_data_from_file( file_path, self._model_or_sim.modeldiscrit, build_cellid=False, ) pd_data = pandas.DataFrame(np_data) if "col" in pd_data: # keep layer/row/column names consistent pd_data = pd_data.rename(columns={"col": "column"}) self._decrement_id_fields(pd_data) else: with open(file_path, "r") as fd_data_file: pd_data, return_val = self._read_text_data( fd_data_file, "", True ) return pd_data
[docs] def load( self, first_line, file_handle, block_header, pre_data_comments=None, external_file_info=None, ): """Loads data from first_line (the first line of data) and open file file_handle which is pointing to the second line of data. Returns a tuple with the first item indicating whether all data was read and the second item being the last line of text read from the file. This method was only designed for internal FloPy use and is not recommended for end users. Parameters ---------- first_line : str A string containing the first line of data in this list. file_handle : file descriptor A file handle for the data file which points to the second line of data for this list block_header : MFBlockHeader Block header object that contains block header information for the block containing this data pre_data_comments : MFComment Comments immediately prior to the data external_file_info : list Contains information about storing files externally Returns ------- more data : bool, next data line : str """ data_storage = self._get_storage_obj() data_storage.modified = False # parse first line to determine if this is internal or external data datautil.PyListUtil.reset_delimiter_used() arr_line = datautil.PyListUtil.split_data_line(first_line) if arr_line and ( len(arr_line[0]) >= 2 and arr_line[0][:3].upper() == "END" ): return [False, arr_line] if len(arr_line) >= 2 and arr_line[0].upper() == "OPEN/CLOSE": try: ( data, multiplier, iprn, binary, data_file, ) = self._process_open_close_line(arr_line) except Exception as ex: message = ( "An error occurred while processing the following " "open/close line: {}".format(arr_line) ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "processing open/close line", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, message, self._simulation_data.debug, ex, ) data_storage.set_external(data_file, data) data_storage.binary = binary data_storage.iprn = iprn return_val = [False, None] # else internal else: # read data into pandas dataframe pd_data, return_val = self._read_text_data( file_handle, first_line, False ) # verify this is the end of the block? # store internal data data_storage.set_internal(pd_data) return return_val
def _new_storage(self): return {"Data": PandasListStorage()} def _get_storage_obj(self, first_record=False): return self._data_storage["Data"] def _get_id_fields(self, data_frame): """ assemble a list of id fields in this dataset Parameters ---------- data_frame : DataFrame data for this list Returns ------- list of column names that are id fields """ id_fields = [] # loop through the data structure for idx, data_item_struct in enumerate( self.structure.data_item_structures ): if data_item_struct.type == DatumType.keystring: # handle id fields for keystring # ***Code not necessary for this version ks_key = data_frame.iloc[0, idx].lower() if ks_key in data_item_struct.keystring_dict: data_item_ks = data_item_struct.keystring_dict[ks_key] else: ks_key = f"{ks_key}record" if ks_key in data_item_struct.keystring_dict: data_item_ks = data_item_struct.keystring_dict[ks_key] else: continue if isinstance(data_item_ks, MFDataStructure): dis = data_item_ks.data_item_structures for data_item in dis: self._update_id_fields( id_fields, data_item, data_frame ) else: self._update_id_fields(id_fields, data_item_ks, data_frame) else: self._update_id_fields(id_fields, data_item_struct, data_frame) return id_fields def _update_id_fields(self, id_fields, data_item_struct, data_frame): """ update the "id_fields" list with new field(s) based on the an item in the expected data structure and the data provided. """ if data_item_struct.numeric_index or data_item_struct.is_cellid: if data_item_struct.name.lower() == "cellid": if isinstance(self._mg, StructuredGrid): id_fields.append("layer") id_fields.append("row") id_fields.append("column") elif isinstance(self._mg, VertexGrid): id_fields.append("layer") id_fields.append("cell") elif isinstance(self._mg, UnstructuredGrid): id_fields.append("node") else: raise MFDataException( "ERROR: Unrecognized model grid " "{str(self._mg)} not supported by MFBasicList" ) else: for col in data_frame.columns: if col.startswith(data_item_struct.name): data_item_len = len(data_item_struct.name) if len(col) > data_item_len: col_end = col[data_item_len:] if ( len(col_end) > 1 and col_end[0] == "_" and datautil.DatumUtil.is_int(col_end[1:]) ): id_fields.append(col) else: id_fields.append(data_item_struct.name) def _increment_id_fields(self, data_frame): """increment all id fields by 1 (reverse for negative values)""" dtypes = data_frame.dtypes for id_field in self._get_id_fields(data_frame): if id_field in data_frame: if id_field in dtypes and dtypes[id_field].str != "<i8": data_frame.astype({id_field: "<i8"}) data_frame.loc[data_frame[id_field].ge(-1), id_field] += 1 data_frame.loc[data_frame[id_field].lt(-1), id_field] -= 1 def _decrement_id_fields(self, data_frame): """decrement all id fields by 1 (reverse for negative values)""" for id_field in self._get_id_fields(data_frame): if id_field in data_frame: data_frame.loc[data_frame[id_field].le(-1), id_field] += 1 data_frame.loc[data_frame[id_field].gt(-1), id_field] -= 1 def _resolve_ext_file_path(self, data_storage): """ returned the resolved relative path of external file in "data_storage" """ # pathing to external file data_dim = self.data_dimensions model_name = data_dim.package_dim.model_dim[0].model_name fp_relative = data_storage.fname if model_name is not None and fp_relative is not None: rel_path = self._simulation_data.mfpath.model_relative_path[ model_name ] if rel_path is not None and len(rel_path) > 0 and rel_path != ".": # include model relative path in external file path # only if model relative path is not already in external # file path i.e. when reading! fp_rp_l = fp_relative.split(os.path.sep) rp_l_r = rel_path.split(os.path.sep)[::-1] for i, rp in enumerate(rp_l_r): if rp != fp_rp_l[len(rp_l_r) - i - 1]: fp_relative = os.path.join(rp, fp_relative) fp = self._simulation_data.mfpath.resolve_path( fp_relative, model_name ) else: if fp_relative is not None: fp = os.path.join( self._simulation_data.mfpath.get_sim_path(), fp_relative ) else: fp = self._simulation_data.mfpath.get_sim_path() return fp def _dataframe_to_recarray(self, data_frame): # convert cellids to tuple df_rec = self._add_cellid_fields(data_frame, False) # convert to recarray return df_rec.to_records(index=False) def _get_data(self): dataframe = self._get_dataframe() if dataframe is None: return None return self._dataframe_to_recarray(dataframe) def _get_dataframe(self): """get and return dataframe for this list data""" data_storage = self._get_storage_obj() if data_storage is None or data_storage.data_storage_type is None: block_exists = self._block.header_exists( self._current_key, self.path ) if block_exists: self._build_data_header() return pandas.DataFrame(columns=self._header_names) else: return None if data_storage.data_storage_type == DataStorageType.internal_array: data = copy.deepcopy(data_storage.internal_data) else: if data_storage.internal_data is not None: # latest data is in internal cache data = copy.deepcopy(data_storage.internal_data) else: # load data from file and return data = self._load_external_data(data_storage) return data
[docs] def get_dataframe(self): """Returns the list's data as a dataframe. Returns ------- data : DataFrame """ return self._get_dataframe()
[docs] def get_data(self, apply_mult=False, **kwargs): """Returns the list's data as a recarray. Parameters ---------- apply_mult : bool Whether to apply a multiplier. Returns ------- data : recarray """ return self._get_data()
[docs] def get_record(self, data_frame=False): """Returns the list's data and metadata in a dictionary. Data is in key "data" and metadata in keys "filename" and "binary". Returns ------- data_record : dict """ return self._get_record(data_frame)
def _get_record(self, data_frame=False): """Returns the list's data and metadata in a dictionary. Data is in key "data" and metadata in keys "filename" and "binary". Returns ------- data_record : dict """ try: if self._get_storage_obj() is None: return None record = self._get_storage_obj().get_record() except Exception as ex: type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "getting record", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, None, self._simulation_data.debug, ex, ) if not data_frame: if "data" not in record: record["data"] = self._get_data() elif record["data"] is not None: data = copy.deepcopy(record["data"]) record["data"] = self._dataframe_to_recarray(data) else: if "data" not in record: record["data"] = self._get_dataframe() return record
[docs] def write_file_entry( self, fd_data_file, ext_file_action=ExtFileAction.copy_relative_paths, fd_main=None, ): """ Writes file entry to file, or if fd_data_file is None returns file entry as string. Parameters ---------- fd_data_file : file descriptor where data is written ext_file_action : ExtFileAction What action to perform on external files fd_main file descriptor where open/close string should be written (for external file data) Returns ------- file entry : str """ return self._write_file_entry(fd_data_file, ext_file_action, fd_main)
[docs] def get_file_entry( self, ext_file_action=ExtFileAction.copy_relative_paths, ): """Returns a string containing the data formatted for a MODFLOW 6 file. Parameters ---------- ext_file_action : ExtFileAction How to handle external paths. Returns ------- file entry : str """ return self._write_file_entry(None)
def _write_file_entry( self, fd_data_file, ext_file_action=ExtFileAction.copy_relative_paths, fd_main=None, ): """ Writes file entry to file, or if fd_data_file is None returns file entry as string. Parameters ---------- fd_data_file : file descriptor Where data is written ext_file_action : ExtFileAction What action to perform on external files fd_main file descriptor where open/close string should be written (for external file data) Returns ------- result of pandas to_csv call """ data_storage = self._get_storage_obj() if data_storage is None: return "" if ( data_storage.data_storage_type == DataStorageType.external_file and fd_main is not None ): indent = self._simulation_data.indent_string ext_string, fname = self._get_external_formatting_str( data_storage.fname, None, data_storage.binary, data_storage.iprn, DataStructureType.recarray, ext_file_action, ) data_storage.fname = fname fd_main.write(f"{indent}{indent}{ext_string}") if data_storage is None or data_storage.internal_data is None: return "" # Loop through data pieces data = self._remove_cellid_fields(data_storage.internal_data) if ( data_storage.data_storage_type == DataStorageType.internal_array or not data_storage.binary or fd_data_file is None ): # add spacer column if "leading_space" not in data: data.insert(loc=0, column="leading_space", value="") if "leading_space_2" not in data: data.insert(loc=0, column="leading_space_2", value="") result = "" # if data is internal or has been modified if ( data_storage.data_storage_type == DataStorageType.internal_array or data is not None or fd_data_file is None ): if ( data_storage.data_storage_type == DataStorageType.external_file and data_storage.binary and fd_data_file is not None ): # write old way using numpy self._save_binary_data(fd_data_file, data) else: if data.shape[0] == 0: if fd_data_file is None or not isinstance( fd_data_file, io.TextIOBase ): result = "\n" else: # no data, just write empty line fd_data_file.write("\n") else: # convert data to 1-based self._increment_id_fields(data) # write converted data float_format = ( f"%{self._simulation_data.reg_format_str[2:-1]}" ) result = data.to_csv( fd_data_file, sep=" ", header=False, index=False, float_format=float_format, lineterminator="\n", ) # clean up data_storage.modified = False self._decrement_id_fields(data) if ( data_storage.data_storage_type == DataStorageType.external_file ): data_storage.internal_data = None if data_storage.internal_data is not None: # clean up if "leading_space" in data_storage.internal_data: data_storage.internal_data = data_storage.internal_data.drop( columns="leading_space" ) if "leading_space_2" in data_storage.internal_data: data_storage.internal_data = data_storage.internal_data.drop( columns="leading_space_2" ) return result def _get_file_path(self): """ gets the file path to the data Returns ------- file_path : file path to data """ data_storage = self._get_storage_obj() if data_storage.fname is None: return None if self._model_or_sim.type == "model": rel_path = self._simulation_data.mfpath.model_relative_path[ self._model_or_sim.name ] fp_relative = data_storage.fname if rel_path is not None and len(rel_path) > 0 and rel_path != ".": # include model relative path in external file path # only if model relative path is not already in external # file path i.e. when reading! fp_rp_l = fp_relative.split(os.path.sep) rp_l_r = rel_path.split(os.path.sep)[::-1] for i, rp in enumerate(rp_l_r): if rp != fp_rp_l[len(rp_l_r) - i - 1]: fp_relative = os.path.join(rp, fp_relative) return self._simulation_data.mfpath.resolve_path( fp_relative, self._model_or_sim.name ) else: return os.path.join( self._simulation_data.mfpath.get_sim_path(), data_storage.fname )
[docs] def plot( self, key=None, names=None, filename_base=None, file_extension=None, mflay=None, **kwargs, ): """ Plot boundary condition (MfList) data Parameters ---------- key : str MfList dictionary key. (default is None) names : list List of names for figure titles. (default is None) filename_base : str Base file name that will be used to automatically generate file names for output image files. Plots will be exported as image files if file_name_base is not None. (default is None) file_extension : str Valid matplotlib.pyplot file extension for savefig(). Only used if filename_base is not None. (default is 'png') mflay : int MODFLOW zero-based layer number to return. If None, then all all layers will be included. (default is None) **kwargs : dict axes : list of matplotlib.pyplot.axis List of matplotlib.pyplot.axis that will be used to plot data for each layer. If axes=None axes will be generated. (default is None) pcolor : bool Boolean used to determine if matplotlib.pyplot.pcolormesh plot will be plotted. (default is True) colorbar : bool Boolean used to determine if a color bar will be added to the matplotlib.pyplot.pcolormesh. Only used if pcolor=True. (default is False) inactive : bool Boolean used to determine if a black overlay in inactive cells in a layer will be displayed. (default is True) contour : bool Boolean used to determine if matplotlib.pyplot.contour plot will be plotted. (default is False) clabel : bool Boolean used to determine if matplotlib.pyplot.clabel will be plotted. Only used if contour=True. (default is False) grid : bool Boolean used to determine if the model grid will be plotted on the figure. (default is False) masked_values : list List of unique values to be excluded from the plot. Returns ---------- out : list Empty list is returned if filename_base is not None. Otherwise a list of matplotlib.pyplot.axis is returned. """ from ...plot import PlotUtilities if not self.plottable: raise TypeError("Simulation level packages are not plottable") if "cellid" not in self.dtype.names: return PlotUtilities._plot_mflist_helper( mflist=self, key=key, kper=None, names=names, filename_base=None, file_extension=None, mflay=None, **kwargs, )
[docs]class MFPandasTransientList( MFPandasList, mfdata.MFTransient, DataListInterface ): """ Provides an interface for the user to access and update MODFLOW transient pandas list data. Parameters ---------- sim_data : MFSimulationData data contained in the simulation structure : MFDataStructure describes the structure of the data enable : bool enable/disable the array path : tuple path in the data dictionary to this MFArray dimensions : MFDataDimensions dimension information related to the model, package, and array """ def __init__( self, sim_data, model_or_sim, structure, enable=True, path=None, dimensions=None, package=None, block=None, ): super().__init__( sim_data=sim_data, model_or_sim=model_or_sim, structure=structure, data=None, enable=enable, path=path, dimensions=dimensions, package=package, block=block, ) self.repeating = True self.empty_keys = {} @property def data_type(self): return DataType.transientlist @property def dtype(self): data = self.get_data() if len(data) > 0: if 0 in data: return data[0].dtype else: return next(iter(data.values())).dtype else: return None @property def plottable(self): """If this list data is plottable""" if self.model is None: return False else: return True @property def data(self): """Returns list data. Calls get_data with default parameters.""" return self.get_data() @property def dataframe(self): """Returns list data. Calls get_data with default parameters.""" return self.get_dataframe()
[docs] def to_array(self, kper=0, mask=False): """Returns list data as an array.""" return super().to_array(kper, mask)
[docs] def remove_transient_key(self, transient_key): """Remove transient stress period key. Method is used internally by FloPy and is not intended to the end user. """ if transient_key in self._data_storage: del self._data_storage[transient_key]
[docs] def add_transient_key(self, transient_key): """Adds a new transient time allowing data for that time to be stored and retrieved using the key `transient_key`. Method is used internally by FloPy and is not intended to the end user. Parameters ---------- transient_key : int Zero-based stress period to add """ super().add_transient_key(transient_key) self._data_storage[transient_key] = PandasListStorage()
[docs] def store_as_external_file( self, external_file_path, binary=False, replace_existing_external=True, check_data=True, ): """Store all data externally in file external_file_path. the binary allows storage in a binary file. If replace_existing_external is set to False, this method will not do anything if the data is already in an external file. Parameters ---------- external_file_path : str Path to external file binary : bool Store data in a binary file replace_existing_external : bool Whether to replace an existing external file. check_data : bool Verify data prior to storing """ self._cache_model_grid = True for sp in self._data_storage.keys(): self._current_key = sp storage = self._get_storage_obj() if storage.internal_size == 0: storage.internal_data = self.get_dataframe(sp) if storage.internal_size > 0 and ( self._get_storage_obj().data_storage_type != DataStorageType.external_file or replace_existing_external ): fname, ext = os.path.splitext(external_file_path) if datautil.DatumUtil.is_int(sp): full_name = f"{fname}_{int(sp) + 1}{ext}" else: full_name = f"{fname}_{sp}{ext}" super().store_as_external_file( full_name, binary, replace_existing_external, check_data, ) self._cache_model_grid = False
[docs] def store_internal( self, check_data=True, ): """Store all data internally. Parameters ---------- check_data : bool Verify data prior to storing """ self._cache_model_grid = True for sp in self._data_storage.keys(): self._current_key = sp if ( self._get_storage_obj().data_storage_type == DataStorageType.external_file ): super().store_internal( check_data, ) self._cache_model_grid = False
[docs] def has_data(self, key=None): """Returns whether this MFList has any data associated with it in key "key".""" if key is None: for sto_key in self._data_storage.keys(): self.get_data_prep(sto_key) if super().has_data(): return True return False else: self.get_data_prep(key) return super().has_data()
[docs] def has_modified_ext_data(self, key=None): if key is None: for sto_key in self._data_storage.keys(): self.get_data_prep(sto_key) if super().has_modified_ext_data(): return True return False else: self.get_data_prep(key) return super().has_modified_ext_data()
[docs] def binary_ext_data(self, key=None): if key is None: for sto_key in self._data_storage.keys(): self.get_data_prep(sto_key) if super().binary_ext_data(): return True return False else: self.get_data_prep(key) return super().binary_ext_data()
[docs] def get_record(self, key=None, data_frame=False): """Returns the data for stress period `key`. If no key is specified returns all records in a dictionary with zero-based stress period numbers as keys. See MFList's get_record documentation for more information on the format of each record returned. Parameters ---------- key : int Zero-based stress period to return data from. data_frame : bool whether to return a Pandas DataFrame object instead of a recarray Returns ------- data_record : dict """ if self._data_storage is not None and len(self._data_storage) > 0: if key is None: output = {} for key in self._data_storage.keys(): self.get_data_prep(key) output[key] = super().get_record(data_frame=data_frame) return output self.get_data_prep(key) return super().get_record() else: return None
[docs] def get_dataframe(self, key=None, apply_mult=False): return self.get_data(key, apply_mult, dataframe=True)
[docs] def get_data(self, key=None, apply_mult=False, dataframe=False, **kwargs): """Returns the data for stress period `key`. Parameters ---------- key : int Zero-based stress period to return data from. apply_mult : bool Apply multiplier dataframe : bool Get as pandas dataframe Returns ------- data : recarray """ if self._data_storage is not None and len(self._data_storage) > 0: if key is None: if "array" in kwargs: output = [] sim_time = self.data_dimensions.package_dim.model_dim[ 0 ].simulation_time num_sp = sim_time.get_num_stress_periods() data = None for sp in range(0, num_sp): if sp in self._data_storage: self.get_data_prep(sp) data = super().get_data(apply_mult=apply_mult) elif self._block.header_exists(sp): data = None output.append(data) return output else: output = {} for key in self._data_storage.keys(): self.get_data_prep(key) if dataframe: output[key] = super().get_dataframe() else: output[key] = super().get_data( apply_mult=apply_mult ) return output self.get_data_prep(key) if dataframe: return super().get_dataframe() else: return super().get_data(apply_mult=apply_mult) else: return None
[docs] def set_record(self, record, autofill=False, check_data=True): """Sets the contents of the data based on the contents of 'record`. Parameters ---------- record : dict Record being set. Record must be a dictionary with keys as zero-based stress periods and values as dictionaries containing the data and metadata. See MFList's set_record documentation for more information on the format of the values. autofill : bool Automatically correct data check_data : bool Whether to verify the data """ self._set_data_record( record, autofill=autofill, check_data=check_data, is_record=True, )
[docs] def set_data(self, data, key=None, autofill=False): """Sets the contents of the data at time `key` to `data`. Parameters ---------- data : dict, recarray, list Data being set. Data can be a dictionary with keys as zero-based stress periods and values as the data. If data is a recarray or list of tuples, it will be assigned to the stress period specified in `key`. If any is set to None, that stress period of data will be removed. key : int Zero based stress period to assign data too. Does not apply if `data` is a dictionary. autofill : bool Automatically correct data. """ self._set_data_record(data, key, autofill)
[docs] def masked_4D_arrays_itr(self): """Returns list data as an iterator of a masked 4D array.""" model_grid = self.data_dimensions.get_model_grid() nper = self.data_dimensions.package_dim.model_dim[ 0 ].simulation_time.get_num_stress_periods() # get the first kper arrays = self.to_array(kper=0, mask=True) if arrays is not None: # initialize these big arrays for name, array in arrays.items(): if model_grid.grid_type() == DiscretizationType.DIS: m4d = np.zeros( ( nper, model_grid.num_layers(), model_grid.num_rows(), model_grid.num_columns(), ) ) m4d[0, :, :, :] = array for kper in range(1, nper): arrays = self.to_array(kper=kper, mask=True) for tname, array in arrays.items(): if tname == name: m4d[kper, :, :, :] = array yield name, m4d else: m3d = np.zeros( ( nper, model_grid.num_layers(), model_grid.num_cells_per_layer(), ) ) m3d[0, :, :] = array for kper in range(1, nper): arrays = self.to_array(kper=kper, mask=True) for tname, array in arrays.items(): if tname == name: m3d[kper, :, :] = array yield name, m3d
def _set_data_record( self, data_record, key=None, autofill=False, check_data=False, is_record=False, ): self._cache_model_grid = True if isinstance(data_record, dict): if "filename" not in data_record and "data" not in data_record: # each item in the dictionary is a list for one stress period # the dictionary key is the stress period the list is for del_keys = [] for key, list_item in data_record.items(): list_item_record = False if list_item is None: self.remove_transient_key(key) del_keys.append(key) self.empty_keys[key] = False elif isinstance(list_item, list) and len(list_item) == 0: self.empty_keys[key] = True else: self.empty_keys[key] = False if isinstance(list_item, dict): list_item_record = True self._set_data_prep(list_item, key) if list_item_record: super().set_record(list_item, autofill, check_data) else: super().set_data( list_item, autofill=autofill, check_data=check_data, ) for key in del_keys: del data_record[key] else: self.empty_keys[key] = False self._set_data_prep(data_record["data"], key) super().set_data(data_record, autofill) else: if is_record: comment = ( "Set record method requires that data_record is a " "dictionary." ) type_, value_, traceback_ = sys.exc_info() raise MFDataException( self.structure.get_model(), self.structure.get_package(), self._path, "setting data record", self.structure.name, inspect.stack()[0][3], type_, value_, traceback_, comment, self._simulation_data.debug, ) if key is None: # search for a key new_key_index = self.structure.first_non_keyword_index() if ( new_key_index is not None and len(data_record) > new_key_index ): key = data_record[new_key_index] else: key = 0 if isinstance(data_record, list) and len(data_record) == 0: self.empty_keys[key] = True else: check = True if ( isinstance(data_record, list) and len(data_record) > 0 and data_record[0] == "no_check" ): # not checking data check = False data_record = data_record[1:] self.empty_keys[key] = False if data_record is None: self.remove_transient_key(key) else: self._set_data_prep(data_record, key) super().set_data(data_record, autofill, check_data=check) self._cache_model_grid = False
[docs] def external_file_name(self, key=0): """Returns external file name, or None if this is not external data. Parameters ---------- key : int Zero based stress period to return data from. """ if key in self.empty_keys and self.empty_keys[key]: return None else: self._get_file_entry_prep(key) return super().external_file_name()
[docs] def write_file_entry( self, fd_data_file, key=0, ext_file_action=ExtFileAction.copy_relative_paths, fd_main=None, ): """Returns a string containing the data at time `key` formatted for a MODFLOW 6 file. Parameters ---------- fd_data_file : file File to write to key : int Zero based stress period to return data from. ext_file_action : ExtFileAction How to handle external paths. Returns ------- file entry : str """ if key in self.empty_keys and self.empty_keys[key]: return "" else: self._get_file_entry_prep(key) return super().write_file_entry( fd_data_file, ext_file_action=ext_file_action, fd_main=fd_main, )
[docs] def get_file_entry( self, key=0, ext_file_action=ExtFileAction.copy_relative_paths ): """Returns a string containing the data at time `key` formatted for a MODFLOW 6 file. Parameters ---------- key : int Zero based stress period to return data from. ext_file_action : ExtFileAction How to handle external paths. Returns ------- file entry : str """ if key in self.empty_keys and self.empty_keys[key]: return "" else: self._get_file_entry_prep(key) return super()._write_file_entry( None, ext_file_action=ext_file_action )
[docs] def load( self, first_line, file_handle, block_header, pre_data_comments=None, external_file_info=None, ): """Loads data from first_line (the first line of data) and open file file_handle which is pointing to the second line of data. Returns a tuple with the first item indicating whether all data was read and the second item being the last line of text read from the file. Parameters ---------- first_line : str A string containing the first line of data in this list. file_handle : file descriptor A file handle for the data file which points to the second line of data for this array block_header : MFBlockHeader Block header object that contains block header information for the block containing this data pre_data_comments : MFComment Comments immediately prior to the data external_file_info : list Contains information about storing files externally """ self._load_prep(block_header) return super().load( first_line, file_handle, block_header, pre_data_comments, external_file_info, )
[docs] def append_list_as_record(self, record, key=0): """Appends the list `data` as a single record in this list's recarray at time `key`. Assumes `data` has the correct dimensions. Parameters ---------- record : list Data to append key : int Zero based stress period to append data too. """ self._append_list_as_record_prep(record, key) super().append_list_as_record(record)
[docs] def update_record(self, record, key_index, key=0): """Updates a record at index `key_index` and time `key` with the contents of `record`. If the index does not exist update_record appends the contents of `record` to this list's recarray. Parameters ---------- record : list Record to append key_index : int Index to update key : int Zero based stress period to append data too """ self._update_record_prep(key) super().update_record(record, key_index)
def _new_storage(self): return {} def _get_storage_obj(self, first_record=False): if first_record and isinstance(self._data_storage, dict): for value in self._data_storage.values(): return value return None if ( self._current_key is None or self._current_key not in self._data_storage ): return None return self._data_storage[self._current_key]
[docs] def plot( self, key=None, names=None, kper=0, filename_base=None, file_extension=None, mflay=None, **kwargs, ): """ Plot stress period boundary condition (MfList) data for a specified stress period Parameters ---------- key : str MfList dictionary key. (default is None) names : list List of names for figure titles. (default is None) kper : int MODFLOW zero-based stress period number to return. (default is zero) filename_base : str Base file name that will be used to automatically generate file names for output image files. Plots will be exported as image files if file_name_base is not None. (default is None) file_extension : str Valid matplotlib.pyplot file extension for savefig(). Only used if filename_base is not None. (default is 'png') mflay : int MODFLOW zero-based layer number to return. If None, then all all layers will be included. (default is None) **kwargs : dict axes : list of matplotlib.pyplot.axis List of matplotlib.pyplot.axis that will be used to plot data for each layer. If axes=None axes will be generated. (default is None) pcolor : bool Boolean used to determine if matplotlib.pyplot.pcolormesh plot will be plotted. (default is True) colorbar : bool Boolean used to determine if a color bar will be added to the matplotlib.pyplot.pcolormesh. Only used if pcolor=True. (default is False) inactive : bool Boolean used to determine if a black overlay in inactive cells in a layer will be displayed. (default is True) contour : bool Boolean used to determine if matplotlib.pyplot.contour plot will be plotted. (default is False) clabel : bool Boolean used to determine if matplotlib.pyplot.clabel will be plotted. Only used if contour=True. (default is False) grid : bool Boolean used to determine if the model grid will be plotted on the figure. (default is False) masked_values : list List of unique values to be excluded from the plot. Returns ---------- out : list Empty list is returned if filename_base is not None. Otherwise a list of matplotlib.pyplot.axis is returned. """ from ...plot import PlotUtilities if not self.plottable: raise TypeError("Simulation level packages are not plottable") # model.plot() will not work for a mf6 model oc package unless # this check is here if self.get_data() is None: return if "cellid" not in self.dtype.names: return axes = PlotUtilities._plot_mflist_helper( self, key=key, names=names, kper=kper, filename_base=filename_base, file_extension=file_extension, mflay=mflay, **kwargs, ) return axes