"""
Module to read MODFLOW output files. The module contains shared
abstract classes that should not be directly accessed.
"""
# in LayerFile, the recordarray attribute begins its life as
# a list, which is appended to in subclasses' build_index(),
# then finally becomes an array, after which it's accessed
# in this file by column name. this probably deserves some
# attention, but in the meantime, disable the pylint rule
# to appease codacy.
#
# pylint: disable=invalid-sequence-index
import warnings
from os import PathLike
from pathlib import Path
from typing import Union
import numpy as np
[docs]class LayerFile:
"""
Base class for layered output files.
Do not instantiate directly.
"""
def __init__(self, filename: Union[str, PathLike], precision, verbose, **kwargs):
from ..discretization.structuredgrid import StructuredGrid
self.filename = Path(filename).expanduser().absolute()
self.precision = precision
self.verbose = verbose
self.file = open(self.filename, "rb")
# Get filesize to ensure this is not an empty file
self.file.seek(0, 2)
totalbytes = self.file.tell()
self.file.seek(0, 0) # reset to beginning
assert self.file.tell() == 0
if totalbytes == 0:
raise ValueError(f"datafile error: file is empty: {filename}")
self.nrow = 0
self.ncol = 0
self.nlay = 0
self.times = []
self.kstpkper = []
self.recordarray = []
self.iposarray = []
if precision == "single":
self.realtype = np.float32
elif precision == "double":
self.realtype = np.float64
else:
raise ValueError(f"Unknown precision specified: {precision}")
self.model = None
self.dis = None
self.modelgrid = None
if "model" in kwargs.keys():
self.model = kwargs.pop("model")
self.modelgrid = self.model.modelgrid
self.dis = self.model.dis
if "dis" in kwargs.keys():
self.dis = kwargs.pop("dis")
self.modelgrid = self.dis.parent.modelgrid
if "tdis" in kwargs.keys():
self.tdis = kwargs.pop("tdis")
if "modelgrid" in kwargs.keys():
self.modelgrid = kwargs.pop("modelgrid")
if len(kwargs.keys()) > 0:
args = ",".join(kwargs.keys())
raise ValueError(f"LayerFile error: unrecognized kwargs: {args}")
# read through the file and build the pointer index
self._build_index()
# now that we read the data and know nrow and ncol,
# we can make a generic modelgrid if needed
if self.modelgrid is None:
self.modelgrid = StructuredGrid(
delc=np.ones((self.nrow,)),
delr=np.ones(self.ncol),
nlay=self.nlay,
xoff=0.0,
yoff=0.0,
angrot=0.0,
)
def __len__(self):
"""
Return the number of records (headers) in the file.
"""
return len(self.recordarray)
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
[docs] def to_geodataframe(
self, gdf=None, modelgrid=None, kstpkper=None, totim=None, attrib_name=None
):
"""
Generate a GeoDataFrame with data from a LayerFile instance
Parameters
----------
gdf : GeoDataFrame
optional, existing geodataframe with NCPL geometries
modelgrid : Grid
optional modelgrid instance to generate a GeoDataFrame from
kstpkper : tuple of ints
A tuple containing the time step and stress period (kstp, kper).
These are zero-based kstp and kper values.
totim : float
The simulation time.
attrib_name : str
optional base name of attribute columns. (default is text attribute)
Returns
-------
GeoDataFrame
"""
if gdf is None:
if modelgrid is None:
if self.modelgrid is None:
raise AssertionError(
"A geodataframe or modelgrid instance must be supplied"
)
modelgrid = self.modelgrid
gdf = modelgrid.to_geodataframe()
array = np.atleast_3d(
self.get_data(kstpkper=kstpkper, totim=totim).transpose()
).transpose()
if attrib_name is None:
attrib_name = self.text.decode()
for ix, arr in enumerate(array):
name = f"{attrib_name}_{ix}"
gdf[name] = np.ravel(arr)
return gdf
[docs] def to_shapefile(
self,
filename: Union[str, PathLike],
kstpkper=None,
totim=None,
mflay=None,
attrib_name="lf_data",
verbose=False,
):
"""
Export model output data to a shapefile at a specific location
in LayerFile instance.
Parameters
----------
filename : str or PathLike
Shapefile path to write
kstpkper : tuple of ints
A tuple containing the time step and stress period (kstp, kper).
These are zero-based kstp and kper values.
totim : float
The simulation time.
mflay : integer
MODFLOW zero-based layer number to return. If None, then layer 1
will be written
attrib_name : str
Base name of attribute columns. (default is 'lf_data')
verbose : bool
Whether to print verbose output
Returns
-------
None
See Also
--------
Notes
-----
Examples
--------
>>> import flopy
>>> hdobj = flopy.utils.HeadFile('test.hds')
>>> times = hdobj.get_times()
>>> hdobj.to_shapefile('test_heads_sp6.shp', totim=times[-1])
"""
warnings.warn(
"to_shapefile() is deprecated and is being replaced by to_geodataframe()",
DeprecationWarning,
)
plotarray = np.atleast_3d(
self.get_data(kstpkper=kstpkper, totim=totim, mflay=mflay).transpose()
).transpose()
if mflay is not None:
attrib_dict = {f"{attrib_name}{mflay}": plotarray[0, :, :]}
else:
attrib_dict = {}
for k in range(plotarray.shape[0]):
name = f"{attrib_name}{k}"
attrib_dict[name] = plotarray[k]
from ..export.shapefile_utils import write_grid_shapefile
write_grid_shapefile(filename, self.modelgrid, attrib_dict, verbose)
[docs] def plot(
self,
axes=None,
kstpkper=None,
totim=None,
mflay=None,
filename_base=None,
**kwargs,
):
"""
Plot 3-D model output data in a specific location
in LayerFile instance
Parameters
----------
axes : list of matplotlib.pyplot.axis
List of matplotlib.pyplot.axis that will be used to plot
data for each layer. If axes=None axes will be generated.
(default is None)
kstpkper : tuple of ints
A tuple containing the time step and stress period (kstp, kper).
These are zero-based kstp and kper values.
totim : float
The simulation time.
mflay : int
MODFLOW zero-based layer number to return. If None, then all
all layers will be included. (default is None)
filename_base : str
Base file name that will be used to automatically generate file
names for output image files. Plots will be exported as image
files if file_name_base is not None. (default is None)
**kwargs : dict
pcolor : bool
Boolean used to determine if matplotlib.pyplot.pcolormesh
plot will be plotted. (default is True)
colorbar : bool
Boolean used to determine if a color bar will be added to
the matplotlib.pyplot.pcolormesh. Only used if pcolor=True.
(default is False)
contour : bool
Boolean used to determine if matplotlib.pyplot.contour
plot will be plotted. (default is False)
clabel : bool
Boolean used to determine if matplotlib.pyplot.clabel
will be plotted. Only used if contour=True. (default is False)
grid : bool
Boolean used to determine if the model grid will be plotted
on the figure. (default is False)
masked_values : list
List of unique values to be excluded from the plot.
file_extension : str
Valid matplotlib.pyplot file extension for savefig(). Only used
if filename_base is not None. (default is 'png')
Returns
-------
None
See Also
--------
Notes
-----
Examples
--------
>>> import flopy
>>> hdobj = flopy.utils.HeadFile('test.hds')
>>> times = hdobj.get_times()
>>> hdobj.plot(totim=times[-1])
"""
if "file_extension" in kwargs:
fext = kwargs.pop("file_extension")
fext = fext.replace(".", "")
else:
fext = "png"
masked_values = kwargs.pop("masked_values", [])
if self.model is not None:
if hasattr(self.model, "bas6") and self.model.bas6 is not None:
masked_values.append(self.model.bas6.hnoflo)
kwargs["masked_values"] = masked_values
filenames = None
if filename_base is not None:
if mflay is not None:
i0 = int(mflay)
if i0 + 1 >= self.nlay:
i0 = self.nlay - 1
i1 = i0 + 1
else:
i0 = 0
i1 = self.nlay
filenames = [f"{filename_base}_Layer{k + 1}.{fext}" for k in range(i0, i1)]
# make sure we have a (lay,row,col) shape plotarray
plotarray = np.atleast_3d(
self.get_data(kstpkper=kstpkper, totim=totim, mflay=mflay).transpose()
).transpose()
from ..plot.plotutil import PlotUtilities
return PlotUtilities._plot_array_helper(
plotarray,
model=self.model,
axes=axes,
filenames=filenames,
mflay=mflay,
modelgrid=self.modelgrid,
**kwargs,
)
def _build_index(self):
"""
Build the recordarray and iposarray, which maps the header information
to the position in the formatted file.
"""
raise NotImplementedError(
"Abstract method _build_index called in LayerFile. "
"This method needs to be overridden."
)
[docs] def list_records(self):
"""
Print a list of all of the records in the file
.. deprecated:: 3.8.0
Use :attr:`headers` instead.
"""
warnings.warn(
"list_records() is deprecated; use headers instead.",
DeprecationWarning,
)
for header in self.recordarray:
print(header)
return
[docs] def get_nrecords(self):
"""
Return the number of records (headers) in the file.
.. deprecated:: 3.8.0
Use :meth:`len` instead.
"""
warnings.warn(
"get_nrecords is deprecated; use len(obj) instead.",
DeprecationWarning,
)
return len(self)
def _get_data_array(self, totim=0):
"""
Get the three dimensional data array for the
specified kstp and kper value or totim value.
"""
if totim >= 0.0:
keyindices = np.asarray(self.recordarray["totim"] == totim).nonzero()[0]
if len(keyindices) == 0:
raise ValueError(f"totim value ({totim}) not found in file")
else:
raise ValueError("Data not found")
# initialize head with nan and then fill it
idx = keyindices[0]
nrow = self.recordarray["nrow"][idx]
ncol = self.recordarray["ncol"][idx]
data = np.empty((self.nlay, nrow, ncol), dtype=self.realtype)
data[:, :, :] = np.nan
for idx in keyindices:
ipos = self.iposarray[idx]
ilay = self.recordarray["ilay"][idx]
if self.verbose:
msg = f"Byte position in file: {ipos} for layer {ilay}"
print(msg)
self.file.seek(ipos, 0)
nrow = self.recordarray["nrow"][idx]
ncol = self.recordarray["ncol"][idx]
shp = (nrow, ncol)
data[ilay - 1] = self._read_data(shp)
return data
[docs] def get_times(self):
"""
Get a list of unique times in the file
Returns
-------
out : list of floats
List contains unique simulation times (totim) in binary file.
"""
return self.times
[docs] def get_kstpkper(self):
"""
Get a list of unique tuples (stress period, time step) in the file.
Indices are 0-based, use the `kstpkper` attribute for 1-based.
Returns
-------
list of (kstp, kper) tuples
List of unique combinations of stress period &
time step indices (0-based) in the binary file
"""
return [(kstp - 1, kper - 1) for kstp, kper in self.kstpkper]
[docs] def get_data(self, kstpkper=None, idx=None, totim=None, mflay=None):
"""
Get data from the file for the specified conditions.
Parameters
----------
idx : int
The zero-based record number. The first record is record 0.
kstpkper : tuple of ints
A tuple (kstep, kper) of zero-based time step and stress period.
totim : float
The simulation time.
mflay : integer
MODFLOW zero-based layer number to return. If None, then all
all layers will be included. (Default is None.)
Returns
-------
data : numpy array
Array has size (nlay, nrow, ncol) if mflay is None or it has size
(nrow, ncol) if mlay is specified.
Notes
-----
If both kstpkper and totim are None, the last entry will be returned.
"""
# One-based kstp and kper for pulling out of recarray
if kstpkper is not None:
kstp1 = kstpkper[0] + 1
kper1 = kstpkper[1] + 1
idx = np.asarray(
(self.recordarray["kstp"] == kstp1)
& (self.recordarray["kper"] == kper1)
).nonzero()
if idx[0].shape[0] == 0:
raise ValueError(f"get_data() error: kstpkper not found: {kstpkper}")
totim1 = self.recordarray[idx]["totim"][0]
elif totim is not None:
totim1 = totim
elif idx is not None:
totim1 = self.recordarray["totim"][idx]
else:
totim1 = self.times[-1]
data = self._get_data_array(totim1)
if mflay is None:
return data
elif isinstance(data, list): # unstructured model, list form
return data[mflay]
else:
return data[mflay, :, :] # structured model, np.array form
[docs] def get_alldata(self, mflay=None, nodata=-9999):
"""
Get all of the data from the file.
Parameters
----------
mflay : integer
MODFLOW zero-based layer number to return. If None, then all
all layers will be included. (Default is None.)
nodata : float
The nodata value in the data array. All array values that have the
nodata value will be assigned np.nan.
Returns
-------
data : numpy array
Array has size (ntimes, nlay, nrow, ncol) if mflay is None or it
has size (ntimes, nrow, ncol) if mlay is specified.
See Also
--------
Notes
-----
Examples
--------
"""
rv = []
for totim in self.times:
h = self.get_data(totim=totim, mflay=mflay)
rv.append(h)
rv = np.array(rv)
rv[rv == nodata] = np.nan
return rv
def _read_data(self, shp):
"""
Read data from file
"""
raise NotImplementedError(
"Abstract method _read_data called in LayerFile. "
"This method needs to be overridden."
)
def _build_kijlist(self, idx):
"""Build normalized cell index list based on grid type.
Accepts natural index formats for each grid type:
- DIS (structured): (k, i, j) or list of such
- DISV (vertex): (k, cellid) or list of such
- DISU (unstructured): node or list of such
For backwards compatibility, also accepts old 3-tuple format with
dummy values: (k, dummy, cellid) for DISV and (dummy, dummy, node) for DISU.
Returns:
- DIS: list of 3-tuples (k, i, j)
- DISV: list of 2-tuples (k, cellid)
- DISU: list of integers (node)
"""
# Determine grid type
grid_type = "structured" if self.modelgrid is None else self.modelgrid.grid_type
# Normalize idx to a list
if isinstance(idx, int):
idx_list = [idx]
elif isinstance(idx, tuple):
idx_list = [idx]
elif isinstance(idx, list):
idx_list = idx
else:
raise ValueError(f"Could not build kijlist from {idx}")
kijlist = []
for item in idx_list:
if grid_type == "structured":
# DIS: expect 3-tuple (k, i, j)
if not isinstance(item, tuple) or len(item) != 3:
raise ValueError(
f"DIS structured grid requires 3-tuple (layer, row, col), "
f"got: {item}"
)
k, i, j = item
# Validate ranges
if k < 0 or k > self.nlay - 1:
raise ValueError(f"Layer index {k} out of range [0, {self.nlay})")
if i < 0 or i > self.nrow - 1:
raise ValueError(f"Row index {i} out of range [0, {self.nrow})")
if j < 0 or j > self.ncol - 1:
raise ValueError(f"Column index {j} out of range [0, {self.ncol})")
kijlist.append((k, i, j))
elif grid_type == "vertex":
if isinstance(item, tuple):
if len(item) == 2:
# proper format: (layer, cellid)
k, cell = item
elif len(item) == 3:
# old format: (layer, dummy, cellid)
k, cell = item[0], item[2]
else:
raise ValueError(
f"DISV vertex grid requires 2-tuple (layer, cellid) "
f"or 3-tuple (layer, dummy, cellid), got: {item}"
)
else:
raise ValueError(
f"DISV vertex grid requires 2-tuple (layer, cellid) "
f"or 3-tuple (layer, dummy, cellid), got: {item}"
)
if k < 0 or k >= self.nlay:
raise ValueError(f"Layer index {k} out of range [0, {self.nlay})")
if cell < 0 or cell >= self.modelgrid.ncpl:
raise ValueError(
f"Cell index {cell} out of range [0, {self.modelgrid.ncpl})"
)
# Store as 2-tuple for DISV
kijlist.append((k, cell))
else:
if isinstance(item, (int, np.integer)):
# proper format: just the node number
node = int(item)
elif isinstance(item, tuple):
if len(item) == 3:
# old format: (dummy, dummy, node)
node = int(item[2])
elif len(item) == 1:
# Also support single-element tuple
node = int(item[0])
else:
raise ValueError(
f"DISU unstructured grid requires integer node index "
f"or 3-tuple (dummy, dummy, node), got: {item}"
)
else:
raise ValueError(
f"DISU unstructured grid requires integer node index "
f"or 3-tuple (dummy, dummy, node), got: {item}"
)
if node < 0 or node >= self.modelgrid.nnodes:
raise ValueError(
f"Node index {node} out of range [0, {self.modelgrid.nnodes})"
)
# Store as integer for DISU
kijlist.append(node)
return kijlist
def _get_nstation(self, idx, kijlist):
if isinstance(idx, list):
return len(kijlist)
elif isinstance(idx, tuple):
return 1
elif isinstance(idx, (int, np.integer)):
return 1 # Single DISU node
else:
return None
def _init_result(self, nstation):
# Initialize result array and put times in first column
result = np.empty((len(self.times), nstation + 1), dtype=self.realtype)
result[:, :] = np.nan
result[:, 0] = np.array(self.times)
return result
[docs] def close(self):
"""
Close the file handle.
"""
self.file.close()