Source code for flopy.utils.datafile

"""
Module to read MODFLOW output files.  The module contains shared
abstract classes that should not be directly accessed.

"""

# in LayerFile, the recordarray attribute begins its life as
# a list, which is appended to in subclasses' build_index(),
# then finally becomes an array, after which it's accessed
# in this file by column name. this probably deserves some
# attention, but in the meantime, disable the pylint rule
# to appease codacy.
#
# pylint: disable=invalid-sequence-index

import os
import warnings
from pathlib import Path
from typing import Union

import numpy as np





[docs]class LayerFile: """ Base class for layered output files. Do not instantiate directly. """ def __init__(self, filename: Union[str, os.PathLike], precision, verbose, **kwargs): from ..discretization.structuredgrid import StructuredGrid self.filename = Path(filename).expanduser().absolute() self.precision = precision self.verbose = verbose self.file = open(self.filename, "rb") # Get filesize to ensure this is not an empty file self.file.seek(0, 2) totalbytes = self.file.tell() self.file.seek(0, 0) # reset to beginning assert self.file.tell() == 0 if totalbytes == 0: raise ValueError(f"datafile error: file is empty: {filename}") self.nrow = 0 self.ncol = 0 self.nlay = 0 self.times = [] self.kstpkper = [] self.recordarray = [] self.iposarray = [] if precision == "single": self.realtype = np.float32 elif precision == "double": self.realtype = np.float64 else: raise Exception(f"Unknown precision specified: {precision}") self.model = None self.dis = None self.mg = None if "model" in kwargs.keys(): self.model = kwargs.pop("model") self.mg = self.model.modelgrid self.dis = self.model.dis if "dis" in kwargs.keys(): self.dis = kwargs.pop("dis") self.mg = self.dis.parent.modelgrid if "tdis" in kwargs.keys(): self.tdis = kwargs.pop("tdis") if "modelgrid" in kwargs.keys(): self.mg = kwargs.pop("modelgrid") if len(kwargs.keys()) > 0: args = ",".join(kwargs.keys()) raise Exception(f"LayerFile error: unrecognized kwargs: {args}") # read through the file and build the pointer index self._build_index() # now that we read the data and know nrow and ncol, # we can make a generic mg if needed if self.mg is None: self.mg = StructuredGrid( delc=np.ones((self.nrow,)), delr=np.ones(self.ncol), nlay=self.nlay, xoff=0.0, yoff=0.0, angrot=0.0, ) def __len__(self): """ Return the number of records (headers) in the file. """ return len(self.recordarray) def __enter__(self): return self def __exit__(self, *exc): self.close()
[docs] def to_shapefile( self, filename: Union[str, os.PathLike], kstpkper=None, totim=None, mflay=None, attrib_name="lf_data", verbose=False, ): """ Export model output data to a shapefile at a specific location in LayerFile instance. Parameters ---------- filename : str or PathLike Shapefile path to write kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : integer MODFLOW zero-based layer number to return. If None, then layer 1 will be written attrib_name : str Base name of attribute columns. (default is 'lf_data') verbose : bool Whether to print verbose output Returns ------- None See Also -------- Notes ----- Examples -------- >>> import flopy >>> hdobj = flopy.utils.HeadFile('test.hds') >>> times = hdobj.get_times() >>> hdobj.to_shapefile('test_heads_sp6.shp', totim=times[-1]) """ plotarray = np.atleast_3d( self.get_data(kstpkper=kstpkper, totim=totim, mflay=mflay).transpose() ).transpose() if mflay is not None: attrib_dict = {f"{attrib_name}{mflay}": plotarray[0, :, :]} else: attrib_dict = {} for k in range(plotarray.shape[0]): name = f"{attrib_name}{k}" attrib_dict[name] = plotarray[k] from ..export.shapefile_utils import write_grid_shapefile write_grid_shapefile(filename, self.mg, attrib_dict, verbose)
[docs] def plot( self, axes=None, kstpkper=None, totim=None, mflay=None, filename_base=None, **kwargs, ): """ Plot 3-D model output data in a specific location in LayerFile instance Parameters ---------- axes : list of matplotlib.pyplot.axis List of matplotlib.pyplot.axis that will be used to plot data for each layer. If axes=None axes will be generated. (default is None) kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : int MODFLOW zero-based layer number to return. If None, then all all layers will be included. (default is None) filename_base : str Base file name that will be used to automatically generate file names for output image files. Plots will be exported as image files if file_name_base is not None. (default is None) **kwargs : dict pcolor : bool Boolean used to determine if matplotlib.pyplot.pcolormesh plot will be plotted. (default is True) colorbar : bool Boolean used to determine if a color bar will be added to the matplotlib.pyplot.pcolormesh. Only used if pcolor=True. (default is False) contour : bool Boolean used to determine if matplotlib.pyplot.contour plot will be plotted. (default is False) clabel : bool Boolean used to determine if matplotlib.pyplot.clabel will be plotted. Only used if contour=True. (default is False) grid : bool Boolean used to determine if the model grid will be plotted on the figure. (default is False) masked_values : list List of unique values to be excluded from the plot. file_extension : str Valid matplotlib.pyplot file extension for savefig(). Only used if filename_base is not None. (default is 'png') Returns ------- None See Also -------- Notes ----- Examples -------- >>> import flopy >>> hdobj = flopy.utils.HeadFile('test.hds') >>> times = hdobj.get_times() >>> hdobj.plot(totim=times[-1]) """ if "file_extension" in kwargs: fext = kwargs.pop("file_extension") fext = fext.replace(".", "") else: fext = "png" masked_values = kwargs.pop("masked_values", []) if self.model is not None: if hasattr(self.model, "bas6") and self.model.bas6 is not None: masked_values.append(self.model.bas6.hnoflo) kwargs["masked_values"] = masked_values filenames = None if filename_base is not None: if mflay is not None: i0 = int(mflay) if i0 + 1 >= self.nlay: i0 = self.nlay - 1 i1 = i0 + 1 else: i0 = 0 i1 = self.nlay filenames = [f"{filename_base}_Layer{k + 1}.{fext}" for k in range(i0, i1)] # make sure we have a (lay,row,col) shape plotarray plotarray = np.atleast_3d( self.get_data(kstpkper=kstpkper, totim=totim, mflay=mflay).transpose() ).transpose() from ..plot.plotutil import PlotUtilities return PlotUtilities._plot_array_helper( plotarray, model=self.model, axes=axes, filenames=filenames, mflay=mflay, modelgrid=self.mg, **kwargs, )
def _build_index(self): """ Build the recordarray and iposarray, which maps the header information to the position in the formatted file. """ raise NotImplementedError( "Abstract method _build_index called in LayerFile. " "This method needs to be overridden." )
[docs] def list_records(self): """ Print a list of all of the records in the file .. deprecated:: 3.8.0 Use :attr:`headers` instead. """ warnings.warn( "list_records() is deprecated; use headers instead.", DeprecationWarning, ) for header in self.recordarray: print(header) return
[docs] def get_nrecords(self): """ Return the number of records (headers) in the file. .. deprecated:: 3.8.0 Use :meth:`len` instead. """ warnings.warn( "get_nrecords is deprecated; use len(obj) instead.", DeprecationWarning, ) return len(self)
def _get_data_array(self, totim=0): """ Get the three dimensional data array for the specified kstp and kper value or totim value. """ if totim >= 0.0: keyindices = np.asarray(self.recordarray["totim"] == totim).nonzero()[0] if len(keyindices) == 0: msg = f"totim value ({totim}) not found in file..." raise Exception(msg) else: raise Exception("Data not found...") # initialize head with nan and then fill it idx = keyindices[0] nrow = self.recordarray["nrow"][idx] ncol = self.recordarray["ncol"][idx] data = np.empty((self.nlay, nrow, ncol), dtype=self.realtype) data[:, :, :] = np.nan for idx in keyindices: ipos = self.iposarray[idx] ilay = self.recordarray["ilay"][idx] if self.verbose: msg = f"Byte position in file: {ipos} for layer {ilay}" print(msg) self.file.seek(ipos, 0) nrow = self.recordarray["nrow"][idx] ncol = self.recordarray["ncol"][idx] shp = (nrow, ncol) data[ilay - 1] = self._read_data(shp) return data
[docs] def get_times(self): """ Get a list of unique times in the file Returns ------- out : list of floats List contains unique simulation times (totim) in binary file. """ return self.times
[docs] def get_kstpkper(self): """ Get a list of unique tuples (stress period, time step) in the file. Indices are 0-based, use the `kstpkper` attribute for 1-based. Returns ------- list of (kstp, kper) tuples List of unique combinations of stress period & time step indices (0-based) in the binary file """ return [(kstp - 1, kper - 1) for kstp, kper in self.kstpkper]
[docs] def get_data(self, kstpkper=None, idx=None, totim=None, mflay=None): """ Get data from the file for the specified conditions. Parameters ---------- idx : int The zero-based record number. The first record is record 0. kstpkper : tuple of ints A tuple (kstep, kper) of zero-based time step and stress period. totim : float The simulation time. mflay : integer MODFLOW zero-based layer number to return. If None, then all all layers will be included. (Default is None.) Returns ------- data : numpy array Array has size (nlay, nrow, ncol) if mflay is None or it has size (nrow, ncol) if mlay is specified. Notes ----- If both kstpkper and totim are None, the last entry will be returned. """ # One-based kstp and kper for pulling out of recarray if kstpkper is not None: kstp1 = kstpkper[0] + 1 kper1 = kstpkper[1] + 1 idx = np.asarray( (self.recordarray["kstp"] == kstp1) & (self.recordarray["kper"] == kper1) ).nonzero() if idx[0].shape[0] == 0: raise Exception(f"get_data() error: kstpkper not found:{kstpkper}") totim1 = self.recordarray[idx]["totim"][0] elif totim is not None: totim1 = totim elif idx is not None: totim1 = self.recordarray["totim"][idx] else: totim1 = self.times[-1] data = self._get_data_array(totim1) if mflay is None: return data else: return data[mflay, :, :]
[docs] def get_alldata(self, mflay=None, nodata=-9999): """ Get all of the data from the file. Parameters ---------- mflay : integer MODFLOW zero-based layer number to return. If None, then all all layers will be included. (Default is None.) nodata : float The nodata value in the data array. All array values that have the nodata value will be assigned np.nan. Returns ------- data : numpy array Array has size (ntimes, nlay, nrow, ncol) if mflay is None or it has size (ntimes, nrow, ncol) if mlay is specified. See Also -------- Notes ----- Examples -------- """ rv = [] for totim in self.times: h = self.get_data(totim=totim, mflay=mflay) rv.append(h) rv = np.array(rv) rv[rv == nodata] = np.nan return rv
def _read_data(self, shp): """ Read data from file """ raise NotImplementedError( "Abstract method _read_data called in LayerFile. " "This method needs to be overridden." ) def _build_kijlist(self, idx): if isinstance(idx, list): kijlist = idx elif isinstance(idx, tuple): kijlist = [idx] else: raise Exception("Could not build kijlist from ", idx) # Check to make sure that k, i, j are within range, otherwise # the seek approach won't work. Can't use k = -1, for example. for k, i, j in kijlist: fail = False if k < 0 or k > self.nlay - 1: fail = True if i < 0 or i > self.nrow - 1: fail = True if j < 0 or j > self.ncol - 1: fail = True if fail: raise Exception( "Invalid cell index. Cell {} not within model grid: {}".format( (k, i, j), (self.nlay, self.nrow, self.ncol) ) ) return kijlist def _get_nstation(self, idx, kijlist): if isinstance(idx, list): return len(kijlist) elif isinstance(idx, tuple): return 1 def _init_result(self, nstation): # Initialize result array and put times in first column result = np.empty((len(self.times), nstation + 1), dtype=self.realtype) result[:, :] = np.nan result[:, 0] = np.array(self.times) return result
[docs] def close(self): """ Close the file handle. """ self.file.close()