Source code for flopy.utils.datafile

"""
Module to read MODFLOW output files.  The module contains shared
abstract classes that should not be directly accessed.

"""

# in LayerFile, the recordarray attribute begins its life as
# a list, which is appended to in subclasses' build_index(),
# then finally becomes an array, after which it's accessed
# in this file by column name. this probably deserves some
# attention, but in the meantime, disable the pylint rule
# to appease codacy.
#
# pylint: disable=invalid-sequence-index

import warnings
from os import PathLike
from pathlib import Path
from typing import Union

import numpy as np





[docs]class LayerFile: """ Base class for layered output files. Do not instantiate directly. """ def __init__(self, filename: Union[str, PathLike], precision, verbose, **kwargs): from ..discretization.structuredgrid import StructuredGrid self.filename = Path(filename).expanduser().absolute() self.precision = precision self.verbose = verbose self.file = open(self.filename, "rb") # Get filesize to ensure this is not an empty file self.file.seek(0, 2) totalbytes = self.file.tell() self.file.seek(0, 0) # reset to beginning assert self.file.tell() == 0 if totalbytes == 0: raise ValueError(f"datafile error: file is empty: {filename}") self.nrow = 0 self.ncol = 0 self.nlay = 0 self.times = [] self.kstpkper = [] self.recordarray = [] self.iposarray = [] if precision == "single": self.realtype = np.float32 elif precision == "double": self.realtype = np.float64 else: raise ValueError(f"Unknown precision specified: {precision}") self.model = None self.dis = None self.modelgrid = None if "model" in kwargs.keys(): self.model = kwargs.pop("model") self.modelgrid = self.model.modelgrid self.dis = self.model.dis if "dis" in kwargs.keys(): self.dis = kwargs.pop("dis") self.modelgrid = self.dis.parent.modelgrid if "tdis" in kwargs.keys(): self.tdis = kwargs.pop("tdis") if "modelgrid" in kwargs.keys(): self.modelgrid = kwargs.pop("modelgrid") if len(kwargs.keys()) > 0: args = ",".join(kwargs.keys()) raise ValueError(f"LayerFile error: unrecognized kwargs: {args}") # read through the file and build the pointer index self._build_index() # now that we read the data and know nrow and ncol, # we can make a generic modelgrid if needed if self.modelgrid is None: self.modelgrid = StructuredGrid( delc=np.ones((self.nrow,)), delr=np.ones(self.ncol), nlay=self.nlay, xoff=0.0, yoff=0.0, angrot=0.0, ) def __len__(self): """ Return the number of records (headers) in the file. """ return len(self.recordarray) def __enter__(self): return self def __exit__(self, *exc): self.close()
[docs] def to_geodataframe( self, gdf=None, modelgrid=None, kstpkper=None, totim=None, attrib_name=None ): """ Generate a GeoDataFrame with data from a LayerFile instance Parameters ---------- gdf : GeoDataFrame optional, existing geodataframe with NCPL geometries modelgrid : Grid optional modelgrid instance to generate a GeoDataFrame from kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. attrib_name : str optional base name of attribute columns. (default is text attribute) Returns ------- GeoDataFrame """ if gdf is None: if modelgrid is None: if self.modelgrid is None: raise AssertionError( "A geodataframe or modelgrid instance must be supplied" ) modelgrid = self.modelgrid gdf = modelgrid.to_geodataframe() array = np.atleast_3d( self.get_data(kstpkper=kstpkper, totim=totim).transpose() ).transpose() if attrib_name is None: attrib_name = self.text.decode() for ix, arr in enumerate(array): name = f"{attrib_name}_{ix}" gdf[name] = np.ravel(arr) return gdf
[docs] def to_shapefile( self, filename: Union[str, PathLike], kstpkper=None, totim=None, mflay=None, attrib_name="lf_data", verbose=False, ): """ Export model output data to a shapefile at a specific location in LayerFile instance. Parameters ---------- filename : str or PathLike Shapefile path to write kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : integer MODFLOW zero-based layer number to return. If None, then layer 1 will be written attrib_name : str Base name of attribute columns. (default is 'lf_data') verbose : bool Whether to print verbose output Returns ------- None See Also -------- Notes ----- Examples -------- >>> import flopy >>> hdobj = flopy.utils.HeadFile('test.hds') >>> times = hdobj.get_times() >>> hdobj.to_shapefile('test_heads_sp6.shp', totim=times[-1]) """ warnings.warn( "to_shapefile() is deprecated and is being replaced by to_geodataframe()", DeprecationWarning, ) plotarray = np.atleast_3d( self.get_data(kstpkper=kstpkper, totim=totim, mflay=mflay).transpose() ).transpose() if mflay is not None: attrib_dict = {f"{attrib_name}{mflay}": plotarray[0, :, :]} else: attrib_dict = {} for k in range(plotarray.shape[0]): name = f"{attrib_name}{k}" attrib_dict[name] = plotarray[k] from ..export.shapefile_utils import write_grid_shapefile write_grid_shapefile(filename, self.modelgrid, attrib_dict, verbose)
[docs] def plot( self, axes=None, kstpkper=None, totim=None, mflay=None, filename_base=None, **kwargs, ): """ Plot 3-D model output data in a specific location in LayerFile instance Parameters ---------- axes : list of matplotlib.pyplot.axis List of matplotlib.pyplot.axis that will be used to plot data for each layer. If axes=None axes will be generated. (default is None) kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : int MODFLOW zero-based layer number to return. If None, then all all layers will be included. (default is None) filename_base : str Base file name that will be used to automatically generate file names for output image files. Plots will be exported as image files if file_name_base is not None. (default is None) **kwargs : dict pcolor : bool Boolean used to determine if matplotlib.pyplot.pcolormesh plot will be plotted. (default is True) colorbar : bool Boolean used to determine if a color bar will be added to the matplotlib.pyplot.pcolormesh. Only used if pcolor=True. (default is False) contour : bool Boolean used to determine if matplotlib.pyplot.contour plot will be plotted. (default is False) clabel : bool Boolean used to determine if matplotlib.pyplot.clabel will be plotted. Only used if contour=True. (default is False) grid : bool Boolean used to determine if the model grid will be plotted on the figure. (default is False) masked_values : list List of unique values to be excluded from the plot. file_extension : str Valid matplotlib.pyplot file extension for savefig(). Only used if filename_base is not None. (default is 'png') Returns ------- None See Also -------- Notes ----- Examples -------- >>> import flopy >>> hdobj = flopy.utils.HeadFile('test.hds') >>> times = hdobj.get_times() >>> hdobj.plot(totim=times[-1]) """ if "file_extension" in kwargs: fext = kwargs.pop("file_extension") fext = fext.replace(".", "") else: fext = "png" masked_values = kwargs.pop("masked_values", []) if self.model is not None: if hasattr(self.model, "bas6") and self.model.bas6 is not None: masked_values.append(self.model.bas6.hnoflo) kwargs["masked_values"] = masked_values filenames = None if filename_base is not None: if mflay is not None: i0 = int(mflay) if i0 + 1 >= self.nlay: i0 = self.nlay - 1 i1 = i0 + 1 else: i0 = 0 i1 = self.nlay filenames = [f"{filename_base}_Layer{k + 1}.{fext}" for k in range(i0, i1)] # make sure we have a (lay,row,col) shape plotarray plotarray = np.atleast_3d( self.get_data(kstpkper=kstpkper, totim=totim, mflay=mflay).transpose() ).transpose() from ..plot.plotutil import PlotUtilities return PlotUtilities._plot_array_helper( plotarray, model=self.model, axes=axes, filenames=filenames, mflay=mflay, modelgrid=self.modelgrid, **kwargs, )
def _build_index(self): """ Build the recordarray and iposarray, which maps the header information to the position in the formatted file. """ raise NotImplementedError( "Abstract method _build_index called in LayerFile. " "This method needs to be overridden." )
[docs] def list_records(self): """ Print a list of all of the records in the file .. deprecated:: 3.8.0 Use :attr:`headers` instead. """ warnings.warn( "list_records() is deprecated; use headers instead.", DeprecationWarning, ) for header in self.recordarray: print(header) return
[docs] def get_nrecords(self): """ Return the number of records (headers) in the file. .. deprecated:: 3.8.0 Use :meth:`len` instead. """ warnings.warn( "get_nrecords is deprecated; use len(obj) instead.", DeprecationWarning, ) return len(self)
def _get_data_array(self, totim=0): """ Get the three dimensional data array for the specified kstp and kper value or totim value. """ if totim >= 0.0: keyindices = np.asarray(self.recordarray["totim"] == totim).nonzero()[0] if len(keyindices) == 0: raise ValueError(f"totim value ({totim}) not found in file") else: raise ValueError("Data not found") # initialize head with nan and then fill it idx = keyindices[0] nrow = self.recordarray["nrow"][idx] ncol = self.recordarray["ncol"][idx] data = np.empty((self.nlay, nrow, ncol), dtype=self.realtype) data[:, :, :] = np.nan for idx in keyindices: ipos = self.iposarray[idx] ilay = self.recordarray["ilay"][idx] if self.verbose: msg = f"Byte position in file: {ipos} for layer {ilay}" print(msg) self.file.seek(ipos, 0) nrow = self.recordarray["nrow"][idx] ncol = self.recordarray["ncol"][idx] shp = (nrow, ncol) data[ilay - 1] = self._read_data(shp) return data
[docs] def get_times(self): """ Get a list of unique times in the file Returns ------- out : list of floats List contains unique simulation times (totim) in binary file. """ return self.times
[docs] def get_kstpkper(self): """ Get a list of unique tuples (stress period, time step) in the file. Indices are 0-based, use the `kstpkper` attribute for 1-based. Returns ------- list of (kstp, kper) tuples List of unique combinations of stress period & time step indices (0-based) in the binary file """ return [(kstp - 1, kper - 1) for kstp, kper in self.kstpkper]
[docs] def get_data(self, kstpkper=None, idx=None, totim=None, mflay=None): """ Get data from the file for the specified conditions. Parameters ---------- idx : int The zero-based record number. The first record is record 0. kstpkper : tuple of ints A tuple (kstep, kper) of zero-based time step and stress period. totim : float The simulation time. mflay : integer MODFLOW zero-based layer number to return. If None, then all all layers will be included. (Default is None.) Returns ------- data : numpy array Array has size (nlay, nrow, ncol) if mflay is None or it has size (nrow, ncol) if mlay is specified. Notes ----- If both kstpkper and totim are None, the last entry will be returned. """ # One-based kstp and kper for pulling out of recarray if kstpkper is not None: kstp1 = kstpkper[0] + 1 kper1 = kstpkper[1] + 1 idx = np.asarray( (self.recordarray["kstp"] == kstp1) & (self.recordarray["kper"] == kper1) ).nonzero() if idx[0].shape[0] == 0: raise ValueError(f"get_data() error: kstpkper not found: {kstpkper}") totim1 = self.recordarray[idx]["totim"][0] elif totim is not None: totim1 = totim elif idx is not None: totim1 = self.recordarray["totim"][idx] else: totim1 = self.times[-1] data = self._get_data_array(totim1) if mflay is None: return data elif isinstance(data, list): # unstructured model, list form return data[mflay] else: return data[mflay, :, :] # structured model, np.array form
[docs] def get_alldata(self, mflay=None, nodata=-9999): """ Get all of the data from the file. Parameters ---------- mflay : integer MODFLOW zero-based layer number to return. If None, then all all layers will be included. (Default is None.) nodata : float The nodata value in the data array. All array values that have the nodata value will be assigned np.nan. Returns ------- data : numpy array Array has size (ntimes, nlay, nrow, ncol) if mflay is None or it has size (ntimes, nrow, ncol) if mlay is specified. See Also -------- Notes ----- Examples -------- """ rv = [] for totim in self.times: h = self.get_data(totim=totim, mflay=mflay) rv.append(h) rv = np.array(rv) rv[rv == nodata] = np.nan return rv
def _read_data(self, shp): """ Read data from file """ raise NotImplementedError( "Abstract method _read_data called in LayerFile. " "This method needs to be overridden." ) def _build_kijlist(self, idx): """Build normalized cell index list based on grid type. Accepts natural index formats for each grid type: - DIS (structured): (k, i, j) or list of such - DISV (vertex): (k, cellid) or list of such - DISU (unstructured): node or list of such For backwards compatibility, also accepts old 3-tuple format with dummy values: (k, dummy, cellid) for DISV and (dummy, dummy, node) for DISU. Returns: - DIS: list of 3-tuples (k, i, j) - DISV: list of 2-tuples (k, cellid) - DISU: list of integers (node) """ # Determine grid type grid_type = "structured" if self.modelgrid is None else self.modelgrid.grid_type # Normalize idx to a list if isinstance(idx, int): idx_list = [idx] elif isinstance(idx, tuple): idx_list = [idx] elif isinstance(idx, list): idx_list = idx else: raise ValueError(f"Could not build kijlist from {idx}") kijlist = [] for item in idx_list: if grid_type == "structured": # DIS: expect 3-tuple (k, i, j) if not isinstance(item, tuple) or len(item) != 3: raise ValueError( f"DIS structured grid requires 3-tuple (layer, row, col), " f"got: {item}" ) k, i, j = item # Validate ranges if k < 0 or k > self.nlay - 1: raise ValueError(f"Layer index {k} out of range [0, {self.nlay})") if i < 0 or i > self.nrow - 1: raise ValueError(f"Row index {i} out of range [0, {self.nrow})") if j < 0 or j > self.ncol - 1: raise ValueError(f"Column index {j} out of range [0, {self.ncol})") kijlist.append((k, i, j)) elif grid_type == "vertex": if isinstance(item, tuple): if len(item) == 2: # proper format: (layer, cellid) k, cell = item elif len(item) == 3: # old format: (layer, dummy, cellid) k, cell = item[0], item[2] else: raise ValueError( f"DISV vertex grid requires 2-tuple (layer, cellid) " f"or 3-tuple (layer, dummy, cellid), got: {item}" ) else: raise ValueError( f"DISV vertex grid requires 2-tuple (layer, cellid) " f"or 3-tuple (layer, dummy, cellid), got: {item}" ) if k < 0 or k >= self.nlay: raise ValueError(f"Layer index {k} out of range [0, {self.nlay})") if cell < 0 or cell >= self.modelgrid.ncpl: raise ValueError( f"Cell index {cell} out of range [0, {self.modelgrid.ncpl})" ) # Store as 2-tuple for DISV kijlist.append((k, cell)) else: if isinstance(item, (int, np.integer)): # proper format: just the node number node = int(item) elif isinstance(item, tuple): if len(item) == 3: # old format: (dummy, dummy, node) node = int(item[2]) elif len(item) == 1: # Also support single-element tuple node = int(item[0]) else: raise ValueError( f"DISU unstructured grid requires integer node index " f"or 3-tuple (dummy, dummy, node), got: {item}" ) else: raise ValueError( f"DISU unstructured grid requires integer node index " f"or 3-tuple (dummy, dummy, node), got: {item}" ) if node < 0 or node >= self.modelgrid.nnodes: raise ValueError( f"Node index {node} out of range [0, {self.modelgrid.nnodes})" ) # Store as integer for DISU kijlist.append(node) return kijlist def _get_nstation(self, idx, kijlist): if isinstance(idx, list): return len(kijlist) elif isinstance(idx, tuple): return 1 elif isinstance(idx, (int, np.integer)): return 1 # Single DISU node else: return None def _init_result(self, nstation): # Initialize result array and put times in first column result = np.empty((len(self.times), nstation + 1), dtype=self.realtype) result[:, :] = np.nan result[:, 0] = np.array(self.times) return result
[docs] def close(self): """ Close the file handle. """ self.file.close()