Source code for flopy.utils.datafile

"""
Module to read MODFLOW output files.  The module contains shared
abstract classes that should not be directly accessed.

"""
import numpy as np





[docs]class LayerFile: """ The LayerFile class is the abstract base class from which specific derived classes are formed. LayerFile This class should not be instantiated directly. """ def __init__(self, filename, precision, verbose, kwargs): from ..discretization.structuredgrid import StructuredGrid self.filename = filename self.precision = precision self.verbose = verbose self.file = open(self.filename, "rb") # Get filesize to ensure this is not an empty file self.file.seek(0, 2) totalbytes = self.file.tell() self.file.seek(0, 0) # reset to beginning assert self.file.tell() == 0 if totalbytes == 0: raise ValueError(f"datafile error: file is empty: {filename}") self.nrow = 0 self.ncol = 0 self.nlay = 0 self.times = [] self.kstpkper = [] self.recordarray = [] self.iposarray = [] if precision == "single": self.realtype = np.float32 elif precision == "double": self.realtype = np.float64 else: raise Exception(f"Unknown precision specified: {precision}") self.model = None self.dis = None self.mg = None if "model" in kwargs.keys(): self.model = kwargs.pop("model") self.mg = self.model.modelgrid self.dis = self.model.dis if "dis" in kwargs.keys(): self.dis = kwargs.pop("dis") self.mg = self.dis.parent.modelgrid if "modelgrid" in kwargs.keys(): self.mg = kwargs.pop("modelgrid") if len(kwargs.keys()) > 0: args = ",".join(kwargs.keys()) raise Exception(f"LayerFile error: unrecognized kwargs: {args}") # read through the file and build the pointer index self._build_index() # now that we read the data and know nrow and ncol, # we can make a generic mg if needed if self.mg is None: self.mg = StructuredGrid( delc=np.ones((self.nrow,)), delr=np.ones( self.ncol, ), nlay=self.nlay, xoff=0.0, yoff=0.0, angrot=0.0, ) return
[docs] def to_shapefile( self, filename, kstpkper=None, totim=None, mflay=None, attrib_name="lf_data", ): """ Export model output data to a shapefile at a specific location in LayerFile instance. Parameters ---------- filename : str Shapefile name to write kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : integer MODFLOW zero-based layer number to return. If None, then layer 1 will be written attrib_name : str Base name of attribute columns. (default is 'lf_data') Returns ---------- None See Also -------- Notes ----- Examples -------- >>> import flopy >>> hdobj = flopy.utils.HeadFile('test.hds') >>> times = hdobj.get_times() >>> hdobj.to_shapefile('test_heads_sp6.shp', totim=times[-1]) """ plotarray = np.atleast_3d( self.get_data( kstpkper=kstpkper, totim=totim, mflay=mflay ).transpose() ).transpose() if mflay != None: attrib_dict = {f"{attrib_name}{mflay}": plotarray[0, :, :]} else: attrib_dict = {} for k in range(plotarray.shape[0]): name = f"{attrib_name}{k}" attrib_dict[name] = plotarray[k] from ..export.shapefile_utils import write_grid_shapefile write_grid_shapefile(filename, self.mg, attrib_dict)
[docs] def plot( self, axes=None, kstpkper=None, totim=None, mflay=None, filename_base=None, **kwargs, ): """ Plot 3-D model output data in a specific location in LayerFile instance Parameters ---------- axes : list of matplotlib.pyplot.axis List of matplotlib.pyplot.axis that will be used to plot data for each layer. If axes=None axes will be generated. (default is None) kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : int MODFLOW zero-based layer number to return. If None, then all all layers will be included. (default is None) filename_base : str Base file name that will be used to automatically generate file names for output image files. Plots will be exported as image files if file_name_base is not None. (default is None) **kwargs : dict pcolor : bool Boolean used to determine if matplotlib.pyplot.pcolormesh plot will be plotted. (default is True) colorbar : bool Boolean used to determine if a color bar will be added to the matplotlib.pyplot.pcolormesh. Only used if pcolor=True. (default is False) contour : bool Boolean used to determine if matplotlib.pyplot.contour plot will be plotted. (default is False) clabel : bool Boolean used to determine if matplotlib.pyplot.clabel will be plotted. Only used if contour=True. (default is False) grid : bool Boolean used to determine if the model grid will be plotted on the figure. (default is False) masked_values : list List of unique values to be excluded from the plot. file_extension : str Valid matplotlib.pyplot file extension for savefig(). Only used if filename_base is not None. (default is 'png') Returns ---------- None See Also -------- Notes ----- Examples -------- >>> import flopy >>> hdobj = flopy.utils.HeadFile('test.hds') >>> times = hdobj.get_times() >>> hdobj.plot(totim=times[-1]) """ if "file_extension" in kwargs: fext = kwargs.pop("file_extension") fext = fext.replace(".", "") else: fext = "png" masked_values = kwargs.pop("masked_values", []) if self.model is not None: if hasattr(self.model, "bas6") and self.model.bas6 is not None: masked_values.append(self.model.bas6.hnoflo) kwargs["masked_values"] = masked_values filenames = None if filename_base is not None: if mflay is not None: i0 = int(mflay) if i0 + 1 >= self.nlay: i0 = self.nlay - 1 i1 = i0 + 1 else: i0 = 0 i1 = self.nlay filenames = [ f"{filename_base}_Layer{k + 1}.{fext}" for k in range(i0, i1) ] # make sure we have a (lay,row,col) shape plotarray plotarray = np.atleast_3d( self.get_data( kstpkper=kstpkper, totim=totim, mflay=mflay ).transpose() ).transpose() from ..plot.plotutil import PlotUtilities return PlotUtilities._plot_array_helper( plotarray, model=self.model, axes=axes, filenames=filenames, mflay=mflay, modelgrid=self.mg, **kwargs, )
def _build_index(self): """ Build the recordarray and iposarray, which maps the header information to the position in the formatted file. """ raise Exception( "Abstract method _build_index called in LayerFile. " "This method needs to be overridden." )
[docs] def list_records(self): """ Print a list of all of the records in the file obj.list_records() """ for header in self.recordarray: print(header) return
def _get_data_array(self, totim=0): """ Get the three dimensional data array for the specified kstp and kper value or totim value. """ if totim >= 0.0: keyindices = np.where((self.recordarray["totim"] == totim))[0] if len(keyindices) == 0: msg = f"totim value ({totim}) not found in file..." raise Exception(msg) else: raise Exception("Data not found...") # initialize head with nan and then fill it idx = keyindices[0] nrow = self.recordarray["nrow"][idx] ncol = self.recordarray["ncol"][idx] data = np.empty((self.nlay, nrow, ncol), dtype=self.realtype) data[:, :, :] = np.nan for idx in keyindices: ipos = self.iposarray[idx] ilay = self.recordarray["ilay"][idx] if self.verbose: msg = f"Byte position in file: {ipos} for layer {ilay}" print(msg) self.file.seek(ipos, 0) nrow = self.recordarray["nrow"][idx] ncol = self.recordarray["ncol"][idx] shp = (nrow, ncol) data[ilay - 1] = self._read_data(shp) return data
[docs] def get_times(self): """ Get a list of unique times in the file Returns ---------- out : list of floats List contains unique simulation times (totim) in binary file. """ return self.times
[docs] def get_kstpkper(self): """ Get a list of unique stress periods and time steps in the file Returns ---------- out : list of (kstp, kper) tuples List of unique kstp, kper combinations in binary file. kstp and kper values are presently zero-based. """ kstpkper = [] for kstp, kper in self.kstpkper: kstpkper.append((kstp - 1, kper - 1)) return kstpkper
[docs] def get_data(self, kstpkper=None, idx=None, totim=None, mflay=None): """ Get data from the file for the specified conditions. Parameters ---------- idx : int The zero-based record number. The first record is record 0. kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : integer MODFLOW zero-based layer number to return. If None, then all all layers will be included. (Default is None.) Returns ---------- data : numpy array Array has size (nlay, nrow, ncol) if mflay is None or it has size (nrow, ncol) if mlay is specified. See Also -------- Notes ----- if both kstpkper and totim are None, will return the last entry Examples -------- """ # One-based kstp and kper for pulling out of recarray if kstpkper is not None: kstp1 = kstpkper[0] + 1 kper1 = kstpkper[1] + 1 idx = np.where( (self.recordarray["kstp"] == kstp1) & (self.recordarray["kper"] == kper1) ) if idx[0].shape[0] == 0: raise Exception( f"get_data() error: kstpkper not found:{kstpkper}" ) totim1 = self.recordarray[idx]["totim"][0] elif totim is not None: totim1 = totim elif idx is not None: totim1 = self.recordarray["totim"][idx] else: totim1 = self.times[-1] data = self._get_data_array(totim1) if mflay is None: return data else: return data[mflay, :, :]
[docs] def get_alldata(self, mflay=None, nodata=-9999): """ Get all of the data from the file. Parameters ---------- mflay : integer MODFLOW zero-based layer number to return. If None, then all all layers will be included. (Default is None.) nodata : float The nodata value in the data array. All array values that have the nodata value will be assigned np.nan. Returns ---------- data : numpy array Array has size (ntimes, nlay, nrow, ncol) if mflay is None or it has size (ntimes, nrow, ncol) if mlay is specified. See Also -------- Notes ----- Examples -------- """ rv = [] for totim in self.times: h = self.get_data(totim=totim, mflay=mflay) rv.append(h) rv = np.array(rv) rv[rv == nodata] = np.nan return rv
def _read_data(self, shp): """ Read data from file """ raise Exception( "Abstract method _read_data called in LayerFile. " "This method needs to be overridden." ) def _build_kijlist(self, idx): if isinstance(idx, list): kijlist = idx elif isinstance(idx, tuple): kijlist = [idx] else: raise Exception("Could not build kijlist from ", idx) # Check to make sure that k, i, j are within range, otherwise # the seek approach won't work. Can't use k = -1, for example. for k, i, j in kijlist: fail = False if k < 0 or k > self.nlay - 1: fail = True if i < 0 or i > self.nrow - 1: fail = True if j < 0 or j > self.ncol - 1: fail = True if fail: raise Exception( "Invalid cell index. Cell {} not within model grid: " "{}".format((k, i, j), (self.nlay, self.nrow, self.ncol)) ) return kijlist def _get_nstation(self, idx, kijlist): if isinstance(idx, list): return len(kijlist) elif isinstance(idx, tuple): return 1 def _init_result(self, nstation): # Initialize result array and put times in first column result = np.empty((len(self.times), nstation + 1), dtype=self.realtype) result[:, :] = np.nan result[:, 0] = np.array(self.times) return result
[docs] def close(self): """ Close the file handle. """ self.file.close() return