"""
Module to read MODFLOW binary output files. The module contains four
important classes that can be accessed by the user.
* HeadFile (Binary head file. Can also be used for drawdown)
* HeadUFile (Binary MODFLOW-USG unstructured head file)
* UcnFile (Binary concentration file from MT3DMS)
* CellBudgetFile (Binary cell-by-cell flow file)
"""
import tempfile
import warnings
from os import PathLike
from pathlib import Path
from shutil import move
from typing import Optional, Union
import numpy as np
import pandas as pd
from flopy.discretization.modeltime import ModelTime
from ..datafile import Header, LayerFile
from ..gridutil import get_lni
def _pad_text_to_16(text):
"""
Pad text to exactly 16 bytes, left-justified (MODFLOW standard).
Parameters
----------
text : str or bytes
Text to pad
Returns
-------
bytes
16-byte text field with spaces on the right
"""
if isinstance(text, str):
text = text.encode("ascii")
if len(text) > 16:
return text[:16]
elif len(text) < 16:
return text + b" " * (16 - len(text))
return text
[docs]def binaryread_struct(file, vartype, shape=(1,), charlen=16):
"""
Read text, a scalar value, or an array of values from a binary file.
file : file object
is an open file object
vartype : type
is the return variable type: str, numpy.int32, numpy.float32,
or numpy.float64
shape : tuple
is the shape of the returned array (shape(1, ) returns a single
value) for example, shape = (nlay, nrow, ncol)
charlen : int
is the length of the text string. Note that string arrays
cannot be returned, only multi-character strings. Shape has no
affect on strings.
.. deprecated:: 3.8.0
Use :meth:`binaryread` instead.
"""
import struct
warnings.warn(
"binaryread_struct() is deprecated; use binaryread() instead.",
DeprecationWarning,
)
# store the mapping from type to struct format (fmt)
typefmtd = {np.int32: "i", np.float32: "f", np.float64: "d"}
# read a string variable of length charlen
if vartype == str:
result = file.read(charlen * 1)
# read other variable types
else:
fmt = typefmtd[vartype]
# find the number of bytes for one value
numbytes = vartype(1).nbytes
# find the number of values
nval = np.prod(shape)
fmt = str(nval) + fmt
s = file.read(numbytes * nval)
result = struct.unpack(fmt, s)
if nval == 1:
result = vartype(result[0])
else:
result = np.array(result, dtype=vartype)
result = np.reshape(result, shape)
return result
[docs]def binaryread(file, vartype, shape=(1,), charlen=16):
"""
Read character bytes, scalar or array values from a binary file.
Parameters
----------
file : file object
is an open file object
vartype : type
is the return variable type: bytes, numpy.int32,
numpy.float32, or numpy.float64. Using str is deprecated since
bytes is preferred.
shape : tuple, default (1,)
is the shape of the returned array (shape(1, ) returns a single
value) for example, shape = (nlay, nrow, ncol)
charlen : int, default 16
is the length character bytes. Note that arrays of bytes
cannot be returned, only multi-character bytes. Shape has no
affect on bytes.
Raises
------
EOFError
"""
if vartype == str:
# handle a hang-over from python2
warnings.warn(
"vartype=str is deprecated; use vartype=bytes instead.",
DeprecationWarning,
)
vartype = bytes
if vartype == bytes:
# read character bytes of length charlen
result = file.read(charlen)
if len(result) < charlen:
raise EOFError
else:
# find the number of values
nval = np.prod(shape)
result = np.fromfile(file, vartype, nval)
if result.size < nval:
raise EOFError
if nval != 1:
result = np.reshape(result, shape)
return result
[docs]def join_struct_arrays(arrays):
"""
Simple function that can join two numpy structured arrays.
"""
newdtype = sum((a.dtype.descr for a in arrays), [])
newrecarray = np.empty(len(arrays[0]), dtype=newdtype)
for a in arrays:
for name in a.dtype.names:
newrecarray[name] = a[name]
return newrecarray
[docs]def get_headfile_precision(filename: Union[str, PathLike]):
"""
Determine precision of a MODFLOW head file.
Parameters
----------
filename : str or PathLike
Path of binary MODFLOW file to determine precision.
Returns
-------
str
Result will be unknown, single, or double
"""
# Set default result if neither single or double works
result = "unknown"
# Open file, and check filesize to ensure this is not an empty file
f = open(filename, "rb")
f.seek(0, 2)
totalbytes = f.tell()
f.seek(0, 0) # reset to beginning
assert f.tell() == 0
if totalbytes == 0:
raise ValueError(f"datafile error: file is empty: {filename}")
# first try single
vartype = [
("kstp", "<i4"),
("kper", "<i4"),
("pertim", "<f4"),
("totim", "<f4"),
("text", "S16"),
]
hdr = binaryread(f, vartype)
charbytes = list(hdr[0][4])
if min(charbytes) >= 32 and max(charbytes) <= 126:
# check if bytes are within conventional ASCII range
result = "single"
success = True
else:
success = False
# next try double
if not success:
f.seek(0)
vartype = [
("kstp", "<i4"),
("kper", "<i4"),
("pertim", "<f8"),
("totim", "<f8"),
("text", "S16"),
]
hdr = binaryread(f, vartype)
charbytes = list(hdr[0][4])
if min(charbytes) >= 32 and max(charbytes) <= 126:
result = "double"
else:
f.close()
raise ValueError(
f"Could not determine the precision of the headfile {filename}"
)
# close and return result
f.close()
return result
[docs]def get_concentration_file_type(filename: Union[str, PathLike], precision):
"""
Method to check header and determine if the concentration file is a MT3D like
file or a MF6 GWT like concentration file
Parameters
----------
filename : str or PathLike
Path of binary MODFLOW file to determine precision.
precision : str
double or single
Returns
-------
str
Result will be ucn or head
"""
f = open(filename, "rb")
f.seek(0, 2)
totalbytes = f.tell()
f.seek(0, 0) # reset to beginning
assert f.tell() == 0
if totalbytes == 0:
raise ValueError(f"datafile error: file is empty: {filename}")
floattype = "f4"
if precision == "double":
floattype = "f8"
# first try mt3d ucn
vartype = [
("ntrans", "i4"),
("kstp", "i4"),
("kper", "i4"),
("totim", floattype),
("text", "S16"),
]
hdr = binaryread(f, vartype)
try:
s = hdr[0][4].decode()
if not s.strip().lower().startswith("c"):
success = False
else:
success = True
result = "ucn"
except ValueError:
success = False
if not success:
f.seek(0)
vartype = [
("kstp", "<i4"),
("kper", "<i4"),
("pertim", floattype),
("totim", floattype),
("text", "S16"),
]
hdr = binaryread(f, vartype)
s = hdr[0][4].decode()
if not s.strip().lower().startswith("c"):
f.close()
raise ValueError(
f"Could not determine the header type of the concentration {filename}"
)
else:
result = "head"
return result
[docs]class BinaryLayerFile(LayerFile):
"""
The BinaryLayerFile class is a parent class from which concrete
classes inherit. This class should not be instantiated directly.
Notes
-----
The BinaryLayerFile class is built on a record array consisting of
headers, which are record arrays of the modflow header information
(kstp, kper, pertim, totim, text, nrow, ncol, ilay), and long ints
pointing to the 1st byte of data for the corresponding data arrays.
"""
def __init__(self, filename: Union[str, PathLike], precision, verbose, **kwargs):
super().__init__(filename, precision, verbose, **kwargs)
def _build_index(self):
"""
Build the recordarray and iposarray, which maps the header information
to the position in the binary file.
"""
header = self._get_header()
self.nrow = header["nrow"]
self.ncol = header["ncol"]
if header["ilay"] > self.nlay:
self.nlay = header["ilay"]
if self.nrow < 0 or self.ncol < 0:
raise ValueError("negative nrow, ncol")
warn_threshold = 10000000
if self.nrow > 1 and self.nrow * self.ncol > warn_threshold:
warnings.warn(
f"Very large grid, ncol ({self.ncol}) * nrow ({self.nrow})"
f" > {warn_threshold}"
)
self.file.seek(0, 2)
self.totalbytes = self.file.tell()
self.file.seek(0, 0)
ipos = 0
while ipos < self.totalbytes:
header = self._get_header()
self.recordarray.append(header)
if self.text.upper() not in header["text"]:
continue
if ipos == 0:
self.times.append(header["totim"])
self.kstpkper.append((header["kstp"], header["kper"]))
else:
totim = header["totim"]
if totim != self.times[-1]:
self.times.append(totim)
self.kstpkper.append((header["kstp"], header["kper"]))
ipos = self.file.tell()
self.iposarray.append(ipos)
databytes = self.get_databytes(header)
self.file.seek(databytes, 1)
ipos = self.file.tell()
# self.recordarray contains a recordarray of all the headers.
self.recordarray = np.array(self.recordarray, dtype=self.header_dtype)
self.iposarray = np.array(self.iposarray, dtype=np.int64)
self.nlay = np.max(self.recordarray["ilay"])
# provide headers as a pandas frame
self.headers = pd.DataFrame(self.recordarray, index=self.iposarray)
self.headers["text"] = (
self.headers["text"].str.decode("ascii", "strict").str.strip()
)
[docs] def get_databytes(self, header):
"""
Parameters
----------
header : datafile.Header
header object
Returns
-------
databytes : int
size of the data array, in bytes, following the header
"""
return (
np.int64(header["ncol"])
* np.int64(header["nrow"])
* np.int64(self.realtype(1).nbytes)
)
def _read_data(self, shp):
return binaryread(self.file, self.realtype, shape=shp)
def _get_header(self):
"""
Read the file header
"""
header = binaryread(self.file, self.header_dtype, (1,))
return header[0]
[docs] def get_ts(self, idx):
"""
Get a time series from the binary file.
Parameters
----------
idx : int, tuple of ints, or list of such
Acceptable values depend on grid type:
- Structured grids (DIS): (layer, row, column) or list of such
- Vertex grids (DISV): (layer, cellid) or list of such
- Unstructured grids (DISU): node number or list of such
All indices must be zero-based.
For backwards compatibility, DISV and DISU grids also accept the old
3-tuple format with dummy values: (layer, dummy, cellid) for DISV and
(dummy, dummy, node) for DISU.
Returns
-------
out : numpy array
Array has size (ntimes, ncells + 1). The first column in the
data array will contain time (totim).
See Also
--------
Notes
-----
Index ranges (zero-based):
- DIS: 0 <= layer < nlay, 0 <= row < nrow, 0 <= col < ncol
- DISV: 0 <= layer < nlay, 0 <= cellid < ncpl
- DISU: 0 <= node < nnodes
Examples
--------
>>> # DIS grid: layer 0, row 5, column 5
>>> ts = hds.get_ts(idx=(0, 5, 5))
>>> # DISV grid: layer 0, cell 12
>>> ts = hds.get_ts(idx=(0, 12))
>>> # DISU grid: node 10
>>> ts = hds.get_ts(idx=10)
"""
kijlist = self._build_kijlist(idx)
nstation = self._get_nstation(idx, kijlist)
# Initialize result array and put times in first column
result = self._init_result(nstation)
# Determine grid type
grid_type = "structured" if self.modelgrid is None else self.modelgrid.grid_type
istat = 1
for item in kijlist:
# Unpack based on grid type
if grid_type == "structured":
# DIS: 3-tuple (k, i, j)
k, i, j = item
ioffset = (i * self.ncol + j) * self.realtype(1).nbytes
elif grid_type == "vertex":
# DISV: 2-tuple (k, cellid)
k, cell = item
ioffset = cell * self.realtype(1).nbytes
else:
# DISU: integer (node)
node = item
ioffset = node * self.realtype(1).nbytes
k = 0 # dummy value for DISU
for irec, header in enumerate(self.recordarray):
ilay = header["ilay"] - 1 # change ilay from header to zero-based
# For structured and vertex grids, check layer matches
# For unstructured grids, read from the single "layer" in the file
if grid_type != "unstructured" and ilay != k:
continue
ipos = self.iposarray[irec].item()
# Calculate offset necessary to reach intended cell
self.file.seek(ipos + int(ioffset), 0)
# Find the time index and then put value into result in the
# correct location.
itim = np.asarray(result[:, 0] == header["totim"]).nonzero()[0]
result[itim, istat] = binaryread(self.file, self.realtype)
istat += 1
return result
[docs]class HeadFile(BinaryLayerFile):
"""
The HeadFile class provides simple ways to retrieve and manipulate
2D or 3D head arrays, or time series arrays for one or more cells,
from a binary head output file. A utility method is also provided
to reverse the order of head data, for use with particle tracking
simulations in which particles are tracked backwards in time from
terminating to release locations (e.g., to compute capture zones).
Parameters
----------
filename : str or PathLike
Path of the head file.
text : string
Name of the text string in the head file. Default is 'head'.
precision : string
Precision of floating point head data in the value. Accepted
values are 'auto', 'single' or 'double'. Default is 'auto',
which enables automatic detection of precision.
verbose : bool
Toggle logging output. Default is False.
Examples
--------
>>> import flopy.utils.binaryfile as bf
>>> hdobj = bf.HeadFile('model.hds', precision='single')
>>> hdobj.headers
>>> rec = hdobj.get_data(kstpkper=(0, 49))
>>> ddnobj = bf.HeadFile('model.ddn', text='drawdown', precision='single')
>>> ddnobj.headers
>>> rec = ddnobj.get_data(totim=100.)
"""
def __init__(
self,
filename: Union[str, PathLike],
text="head",
precision="auto",
verbose=False,
**kwargs,
):
self.text = text.encode()
if precision == "auto":
precision = get_headfile_precision(filename)
if precision == "unknown":
raise ValueError(
f"Error. Precision could not be determined for {filename}"
)
self.header_dtype = BinaryHeader.set_dtype(bintype="Head", precision=precision)
super().__init__(filename, precision, verbose, **kwargs)
[docs] @classmethod
def write(
cls,
filename,
data,
nrow=None,
ncol=None,
nlay=None,
ncpl=None,
nnodes=None,
text="head",
precision="double",
totim=None,
pertim=None,
kstpkper=None,
verbose=False,
):
"""
Write head data directly to a binary file.
This classmethod writes head data arrays to a binary head file and returns
a HeadFile instance with the file open.
Parameters
----------
filename : str or PathLike
Path for the output file
data : ndarray, dict, or list
Head data in one of three formats:
1. Array with time dimension:
- Shape (ntimes, nlay, nrow, ncol) or (ntimes, nrow, ncol)
- First dimension is time, creates one record per time step
- Requires kstpkper parameter or uses sequential (1,1), (1,2), (1,3), ...
2. Dict mapping (kstp, kper) tuples to arrays:
{(kstp, kper): array, ...}
- Arrays should be 2D (nrow, ncol) or 3D (nlay, nrow, ncol)
3. List of dicts with full metadata:
[{'data': array, 'kstp': int, 'kper': int,
'totim': float, 'pertim': float, 'ilay': int (optional)}, ...]
- Each dict represents one layer at one timestep
- ilay defaults to 1 if not provided
nrow : int, optional
Number of rows (DIS only). Inferred if None.
ncol : int, optional
Number of columns (DIS only). Inferred if None.
nlay : int, optional
Number of layers (DIS, DISV). Inferred if None.
ncpl : int, optional
Number of cells per layer (DISV only). Inferred if None.
nnodes : int, optional
Total number of nodes (DISU only). Inferred if None.
text : str, default "head"
Text identifier for the head data (will be padded to 16 characters)
precision : str, default "double"
Precision of floating point data: 'single' or 'double'
totim : float, dict, or list, optional
Total time values. Can be:
- float/int: Use same value for all records (only valid with
single timestep)
- dict: Maps (kstp, kper) to totim values
- list: Should match order of data
- None: Defaults to sequential counter (1.0, 2.0, 3.0, ...)
pertim : float, dict, or list, optional
Period time values. Can be:
- float/int: Use same value for all records (only valid with
single timestep)
- dict: Maps (kstp, kper) to pertim values
- list: Should match order of data
- None: Defaults to totim
kstpkper : list of tuples, optional
Time step/period mapping for array data with time dimension.
List of (kstp, kper) tuples, one per time step.
If None, uses sequential numbering: (1,1), (1,2), (1,3), ...
Ignored if data is dict or list format.
verbose : bool, default False
Print progress messages
Returns
-------
HeadFile
Instance with the written file open
Notes
-----
Discretization types are determined by which parameters are provided:
- DIS (structured): nlay, nrow, ncol
- DISV (vertically staggered): nlay, ncpl
- DISU (unstructured): nnodes
Examples
--------
>>> import numpy as np
>>> from flopy.utils import HeadFile
>>>
>>> # Write head data for two time steps
>>> head1 = np.random.rand(3, 10, 20) # 3 layers, 10 rows, 20 cols
>>> head2 = np.random.rand(3, 10, 20)
>>> data = {
... (1, 1): head1,
... (1, 2): head2,
... }
>>> hds = HeadFile.write('output.hds', data)
>>> hds.get_times()
[1.0, 2.0]
>>>
>>> # Or with explicit time values
>>> data_with_times = [
... {'data': head1, 'kstp': 1, 'kper': 1, 'totim': 10.0, 'pertim': 10.0},
... {'data': head2, 'kstp': 1, 'kper': 2, 'totim': 20.0, 'pertim': 10.0},
... ]
>>> HeadFile.write('output.hds', data_with_times)
"""
# xarray duck typing - extract underlying numpy array
if hasattr(data, "values") and hasattr(data, "dims"):
data = data.values
# Scalar handling - broadcast to shape
if isinstance(data, (int, float, np.number)):
# Determine shape from grid parameters
if nnodes is not None:
shape = (nnodes,)
elif ncpl is not None and nlay is not None:
shape = (nlay, ncpl)
elif nrow is not None and ncol is not None:
if nlay is not None:
shape = (nlay, nrow, ncol)
else:
shape = (nrow, ncol)
else:
raise ValueError(
"Must provide grid dimensions (nlay/nrow/ncol, ncpl, or nnodes) "
"when using scalar data"
)
# Default to single timestep if kstpkper not provided
if kstpkper is None:
kstpkper = [(1, 1)]
# Create array with time dimension
realtype = np.float32 if precision == "single" else np.float64
arr = np.full((len(kstpkper),) + shape, data, dtype=realtype)
data = arr
# List handling - convert list of arrays to array with time dimension
if isinstance(data, list):
if not data:
raise ValueError("Empty data list")
# Check if it's list of dicts (already supported) or list of arrays
if isinstance(data[0], dict):
# List of dicts - let existing code handle it
pass
else:
# List of arrays - convert to numpy array with time dimension
# First extract .values from any xarray elements
arrays = []
for elem in data:
if hasattr(elem, "values") and hasattr(elem, "dims"):
arrays.append(elem.values)
else:
arrays.append(elem)
try:
data = np.array(arrays)
except Exception as e:
raise ValueError(f"Could not convert list to array: {e}")
# Handle array with time dimension - convert to dict format
if isinstance(data, np.ndarray):
arr = data
ntimes = arr.shape[0]
# Generate or validate kstpkper
if kstpkper is None:
# Sequential stress periods: (1,1), (1,2), (1,3), ...
kstpkper = [(1, i) for i in range(1, ntimes + 1)]
elif len(kstpkper) != ntimes:
raise ValueError(
f"kstpkper must have {ntimes} entries to match "
f"time dimension, got {len(kstpkper)}"
)
# Convert to dict format
data = {}
for i, (kstp, kper) in enumerate(kstpkper):
data[(kstp, kper)] = arr[i]
# Normalize data to list of record dicts
if isinstance(data, dict):
# Validate that single-value times are only used with single timestep
if len(data) > 1:
if isinstance(totim, (int, float)):
raise ValueError(
"totim cannot be a single value when data has "
"multiple time steps. Use a dict mapping (kstp, "
"kper) to time values, or pass data with a single "
"time step."
)
if isinstance(pertim, (int, float)):
raise ValueError(
"pertim cannot be a single value when data has "
"multiple time steps. Use a dict mapping (kstp, "
"kper) to time values, or pass data with a single "
"time step."
)
records = []
for i, ((kstp, kper), arr) in enumerate(sorted(data.items()), start=1):
# Handle xarray in dict values
if hasattr(arr, "values") and hasattr(arr, "dims"):
arr = arr.values
arr = np.asarray(arr)
# Allow 1D arrays for DISV/DISU, require 2D+ for DIS
if arr.ndim == 1:
# 1D array - valid for DISV (ncpl) or DISU (nnodes)
if ncpl is not None or nnodes is not None:
# Single layer for DISV/DISU - reshape to (1, ncells)
nlayers = 1
ncells = arr.shape[0]
arr = arr.reshape(1, ncells)
else:
raise ValueError(
"1D arrays require ncpl or nnodes parameter. "
"For DIS grids, use 2D (nrow, ncol) or 3D "
"(nlay, nrow, ncol) arrays."
)
elif arr.ndim == 2:
# 2D array
if ncpl is not None:
# DISV: (nlay, ncpl) - already in right shape
nlayers = arr.shape[0]
ncells = arr.shape[1]
elif nnodes is not None:
# DISU: shouldn't have 2D, but treat as single layer
nlayers = 1
ncells = arr.size
arr = arr.reshape(1, ncells)
else:
# DIS: single layer (nrow, ncol) - reshape to (1, nrow, ncol)
nlayers = 1
nrows = arr.shape[0]
ncols = arr.shape[1]
arr = arr.reshape(1, arr.shape[0], arr.shape[1])
else:
# 3D array (nlay, nrow, ncol) for DIS
nlayers = arr.shape[0]
nrows = arr.shape[1]
ncols = arr.shape[2]
# Get time values
if totim is None:
tot = float(i)
elif isinstance(totim, dict):
tot = totim.get((kstp, kper), float(i))
elif isinstance(totim, (int, float)):
tot = float(totim)
else:
raise ValueError("totim must be None, number, or dict")
if pertim is None:
per = tot
elif isinstance(pertim, dict):
per = pertim.get((kstp, kper), tot)
elif isinstance(pertim, (int, float)):
per = float(pertim)
else:
raise ValueError("pertim must be None, number, or dict")
# Create one record per layer
for ilay in range(nlayers):
records.append(
{
"data": arr[ilay],
"kstp": kstp,
"kper": kper,
"totim": tot,
"pertim": per,
"ilay": ilay + 1,
}
)
elif isinstance(data, list):
records = []
for rec in data:
arr = rec["data"]
# Handle xarray in list elements
if hasattr(arr, "values") and hasattr(arr, "dims"):
arr = arr.values
arr = np.asarray(arr)
if arr.ndim == 1:
raise ValueError("Data arrays must be at least 2D")
# Handle 2D vs 3D
if arr.ndim == 2:
# Single layer record
records.append(
{
"data": arr,
"kstp": rec["kstp"],
"kper": rec["kper"],
"totim": rec.get("totim", float(rec["kper"])),
"pertim": rec.get(
"pertim", rec.get("totim", float(rec["kper"]))
),
"ilay": rec.get("ilay", 1),
}
)
else:
# 3D array - create one record per layer
nlayers = arr.shape[0]
for ilay in range(nlayers):
records.append(
{
"data": arr[ilay],
"kstp": rec["kstp"],
"kper": rec["kper"],
"totim": rec.get("totim", float(rec["kper"])),
"pertim": rec.get(
"pertim", rec.get("totim", float(rec["kper"]))
),
"ilay": ilay + 1,
}
)
else:
raise ValueError("data must be dict or list")
if len(records) == 0:
raise ValueError("No data records provided")
# Determine discretization type and infer dimensions
if nnodes is not None:
# DISU (unstructured)
dis_type = "DISU"
nlay = 1
nrow = 1
ncol = nnodes
expected_shape = (nnodes,)
elif ncpl is not None:
# DISV (vertically staggered)
dis_type = "DISV"
if nlay is None:
nlay = max(rec["ilay"] for rec in records)
nrow = 1
ncol = ncpl
expected_shape = (ncpl,)
else:
# DIS (structured) - default
dis_type = "DIS"
first_data = records[0]["data"]
if nrow is None:
nrow = first_data.shape[0]
if ncol is None:
ncol = first_data.shape[1]
if nlay is None:
nlay = max(rec["ilay"] for rec in records)
expected_shape = (nrow, ncol)
# Validate dimensions
for rec in records:
if rec["data"].shape != expected_shape:
raise ValueError(
f"Inconsistent array shapes: expected {expected_shape}, "
f"got {rec['data'].shape}"
)
# Set precision dtype
realtype = np.float32 if precision == "single" else np.float64
# Pad text to 16 bytes
text = _pad_text_to_16(text)
# Create temporary file if no filename provided
if filename is None:
# Create a temp file that won't be auto-deleted
fd, filename = tempfile.mkstemp(suffix=".hds")
import os
os.close(fd) # Close the file descriptor, we'll open it for writing
# Write binary file
if verbose:
print(f"Writing binary head file: {filename}")
print(f" Text identifier: {text.decode().strip()}")
print(f" Precision: {precision}")
print(f" Discretization: {dis_type}")
if dis_type == "DIS":
print(f" Dimensions: {nlay} layers, {nrow} rows, {ncol} columns")
elif dis_type == "DISV":
print(f" Dimensions: {nlay} layers, {ncpl} cells per layer")
elif dis_type == "DISU":
print(f" Dimensions: {nnodes} nodes (unstructured)")
print(f" Number of records: {len(records)}")
# Use BinaryHeader.create() and write like Util2d.write_bin() does
with open(filename, "wb") as f:
for rec in records:
# Create header using BinaryHeader.create()
header = BinaryHeader.create(
bintype="Head",
precision=precision,
text=text.decode().strip(),
nrow=nrow,
ncol=ncol,
ilay=rec["ilay"],
pertim=rec["pertim"],
totim=rec["totim"],
kstp=rec["kstp"],
kper=rec["kper"],
)
# Write header and data
header.tofile(f)
rec["data"].astype(realtype).tofile(f)
# Explicitly flush and sync to ensure data is written
f.flush()
import os
os.fsync(f.fileno())
# Return an instance with the file open
return cls(filename, precision=precision, verbose=verbose)
[docs] def reverse(self, filename: Optional[PathLike] = None):
"""
Reverse the time order of the currently loaded binary head file. If a head
file name is not provided or the provided name is the same as the existing
filename, the file will be overwritten and reloaded.
Parameters
----------
filename : str or PathLike, optional
Path of the reversed binary head file.
"""
filename = (
Path(filename).expanduser().absolute()
if filename is not None
else self.filename
)
time = ModelTime.from_headers(self.recordarray)
time._set_totim_dict()
trev = time.reverse()
trev._set_totim_dict()
nper = time.nper
seen = set()
def reverse_header(header):
"""Reverse period, step and time fields in the record header"""
nonlocal seen
kper = header["kper"] - 1
kstp = header["kstp"] - 1
header = header.copy()
header["kper"] = nper - kper
header["kstp"] = time.nstp[kper] - kstp
kper = header["kper"] - 1
kstp = header["kstp"] - 1
seen.add((kper, kstp))
header["pertim"] = trev._pertim_dict[kper, kstp]
header["totim"] = trev._totim_dict[kper, kstp]
return header
target = filename
# if rewriting the same file, write
# temp file then copy it into place
inplace = filename == self.filename
if inplace:
temp_dir_path = Path(tempfile.gettempdir())
temp_file_path = temp_dir_path / filename.name
target = temp_file_path
# reverse record order
with open(target, "wb") as f:
for i in range(len(self) - 1, -1, -1):
header = self.recordarray[i].copy()
header = reverse_header(header)
text = header["text"]
ilay = header["ilay"]
kstp = header["kstp"]
kper = header["kper"]
pertim = header["pertim"]
totim = header["totim"]
data = self.get_data(idx=i)[ilay - 1]
dt = np.dtype(
[
("kstp", np.int32),
("kper", np.int32),
("pertim", np.float64),
("totim", np.float64),
("text", "S16"),
("ncol", np.int32),
("nrow", np.int32),
("ilay", np.int32),
]
)
nrow = data.shape[0]
ncol = data.shape[1]
h = np.array(
(kstp, kper, pertim, totim, text, ncol, nrow, ilay), dtype=dt
)
h.tofile(f)
data.tofile(f)
# if we rewrote the original file, reinitialize
if inplace:
move(target, filename)
super().__init__(filename, self.precision, self.verbose)
[docs] def export(
self,
filename: Union[str, PathLike],
kstpkper: Optional[list] = None,
**kwargs,
):
"""
Export head data to a binary file.
Parameters
----------
filename : str or PathLike
Path to output head file
kstpkper : list of tuples, optional
Subset of (kstp, kper) tuples to export. If None, exports all time steps.
**kwargs
Additional keyword arguments:
- text : str, identifier for head data (default uses current file's text)
- precision : str, 'single' or 'double' (default is the file's precision)
- verbose : bool, print progress messages
Examples
--------
>>> hds = HeadFile('input.hds')
>>> # Export all time steps
>>> hds.export('output.hds')
>>> # Export specific time steps
>>> hds.export('output.hds', kstpkper=[(1, 0), (1, 1)])
"""
# Determine which time steps to write
if kstpkper is None:
kstpkper = self.kstpkper
# Set defaults from current file if not provided
text = kwargs.get("text")
if text is None:
text = self.recordarray["text"][0].decode().strip()
precision = kwargs.get("precision", self.precision)
verbose = kwargs.get("verbose", False)
# Set precision
realtype = np.float32 if precision == "single" else np.float64
# Pad text to 16 bytes
text_bytes = _pad_text_to_16(text)
# Pre-allocate header dtype outside loop for better performance
dt = np.dtype(
[
("kstp", np.int32),
("kper", np.int32),
("pertim", realtype),
("totim", realtype),
("text", "S16"),
("ncol", np.int32),
("nrow", np.int32),
("ilay", np.int32),
]
)
# Sort kstpkper upfront for correct output order
sorted_kstpkper = sorted(kstpkper, key=lambda x: (int(x[0]), int(x[1])))
if verbose:
print(f"Writing binary head file: {filename}")
print(f" Text identifier: {text_bytes.decode().strip()}")
print(f" Precision: {precision}")
print(f" Number of time steps: {len(sorted_kstpkper)}")
# Write the file
with open(filename, "wb") as f:
for ksp in sorted_kstpkper:
try:
# Convert numpy int32 to Python int if needed
kstp = int(ksp[0])
kper = int(ksp[1])
# Find the totim for this kstpkper
mask = (self.recordarray["kstp"] == kstp) & (
self.recordarray["kper"] == kper
)
matching_records = self.recordarray[mask]
if len(matching_records) == 0:
if verbose:
print(f"Warning: No records found for {ksp}")
continue
record = matching_records[0]
totim = float(record["totim"])
pertim = float(record["pertim"])
# Get data using totim (works for multi-layer files)
head = np.asarray(self.get_data(totim=totim))
# Handle both 3D (nlay, nrow, ncol) and 2D (nrow, ncol) arrays
if head.ndim == 2:
head = head.reshape(1, head.shape[0], head.shape[1])
nlay, nrow, ncol = head.shape
if verbose:
print(f" Writing kstp={kstp}, kper={kper}, totim={totim}")
print(f" Shape: {nlay} layers x {nrow} rows x {ncol} cols")
# Write one record per layer
for ilay in range(nlay):
h = np.array(
(
kstp,
kper,
pertim,
totim,
text_bytes,
ncol,
nrow,
ilay + 1,
),
dtype=dt,
)
h.tofile(f)
head[ilay].astype(realtype).tofile(f)
except Exception as e:
if verbose:
print(f"Warning: Could not read data for {ksp}: {e}")
continue
if verbose:
print(f"Successfully wrote {filename}")
[docs]class UcnFile(BinaryLayerFile):
"""
UcnFile Class.
Parameters
----------
filename : string
Name of the concentration file
text : string
Name of the text string in the ucn file. Default is 'CONCENTRATION'
precision : string
'auto', 'single' or 'double'. Default is 'auto'.
verbose : bool
Write information to the screen. Default is False.
Attributes
----------
Methods
-------
See Also
--------
Notes
-----
The UcnFile class provides simple ways to retrieve 2d and 3d
concentration arrays from a MT3D binary head file and time series
arrays for one or more cells.
The BinaryLayerFile class is built on a record array consisting of
headers, which are record arrays of the modflow header information
(kstp, kper, pertim, totim, text, nrow, ncol, ilay)
and long integers, which are pointers to first bytes of data for
the corresponding data array.
Examples
--------
>>> import flopy.utils.binaryfile as bf
>>> ucnobj = bf.UcnFile('MT3D001.UCN', precision='single')
>>> ucnobj.headers
>>> rec = ucnobj.get_data(kstpkper=(0, 0))
"""
def __init__(
self,
filename,
text="concentration",
precision="auto",
verbose=False,
**kwargs,
):
self.text = text.encode()
if precision == "auto":
precision = get_headfile_precision(filename)
if precision == "unknown":
raise ValueError(f"Error. Precision could not be determined for {filename}")
bintype = get_concentration_file_type(filename, precision)
self.header_dtype = BinaryHeader.set_dtype(bintype=bintype, precision=precision)
super().__init__(filename, precision, verbose, **kwargs)
return
[docs]class HeadUFile(BinaryLayerFile):
"""
The HeadUFile class provides simple ways to retrieve a list of
head arrays from a MODFLOW-USG binary head file and time series
arrays for one or more cells.
Parameters
----------
filename : str or PathLike
Path of the head file
text : string
Name of the text string in the head file. Default is 'headu'.
precision : string
Precision of the floating point head data in the file. Accepted
values are 'auto', 'single' or 'double'. Default is 'auto', which
enables precision to be automatically detected.
verbose : bool
Toggle logging output. Default is False.
Notes
-----
The BinaryLayerFile class is built on a record array consisting of
headers, which are record arrays of the modflow header information
(kstp, kper, pertim, totim, text, nrow, ncol, ilay), and long ints
pointing to the 1st byte of data for the corresponding data arrays.
This class overrides methods in the parent class so that the proper
sized arrays are created: for unstructured grids, nrow and ncol are
the starting and ending node numbers for layer, ilay.
When the get_data method is called for this class, a list of
one-dimensional arrays will be returned, where each array is the head
array for a layer. If the heads for a layer were not saved, then
None will be returned for that layer.
Examples
--------
>>> import flopy.utils.binaryfile as bf
>>> hdobj = bf.HeadUFile('model.hds')
>>> hdobj.headers
>>> usgheads = hdobj.get_data(kstpkper=(0, 49))
"""
def __init__(
self,
filename: Union[str, PathLike],
text="headu",
precision="auto",
verbose=False,
**kwargs,
):
"""
Class constructor
"""
self.text = text.encode()
if precision == "auto":
precision = get_headfile_precision(filename)
if precision == "unknown":
raise ValueError(
f"Error. Precision could not be determined for {filename}"
)
self.header_dtype = BinaryHeader.set_dtype(bintype="Head", precision=precision)
super().__init__(filename, precision, verbose, **kwargs)
def _get_data_array(self, totim=0.0):
"""
Get a list of 1D arrays for the
specified kstp and kper value or totim value.
"""
if totim >= 0.0:
keyindices = np.asarray(self.recordarray["totim"] == totim).nonzero()[0]
if len(keyindices) == 0:
raise ValueError(f"totim value ({totim}) not found in file")
else:
raise ValueError("Data not found")
# fill a list of 1d arrays with heads from binary file
data = self.nlay * [None]
for idx in keyindices:
ipos = self.iposarray[idx]
ilay = self.recordarray["ilay"][idx]
nstrt = self.recordarray["ncol"][idx]
nend = self.recordarray["nrow"][idx]
npl = nend - nstrt + 1
if self.verbose:
print(f"Byte position in file: {ipos} for layer {ilay}")
self.file.seek(ipos, 0)
data[ilay - 1] = binaryread(self.file, self.realtype, shape=(npl,))
return data
[docs] def get_databytes(self, header):
"""
Parameters
----------
header : datafile.Header
header object
Returns
-------
databytes : int
size of the data array, in bytes, following the header
"""
# unstructured head files contain node starting and ending indices
# for each layer
nstrt = np.int64(header["ncol"])
nend = np.int64(header["nrow"])
npl = nend - nstrt + 1
return npl * np.int64(self.realtype(1).nbytes)
[docs] def get_ts(self, idx):
"""
Get a time series from the binary HeadUFile
Parameters
----------
idx : int or list of ints
idx can be nodenumber or it can be a list in the form
[nodenumber, nodenumber, ...]. The nodenumber,
values must be zero based.
Returns
-------
out : numpy array
Array has size (ntimes, ncells + 1). The first column in the
data array will contain time (totim).
"""
times = self.get_times()
data = self.get_data(totim=times[0])
layers = len(data)
ncpl = [len(data[l]) for l in range(layers)]
result = []
if isinstance(idx, int):
layer, nn = get_lni(ncpl, [idx])[0]
for i, time in enumerate(times):
data = self.get_data(totim=time)
value = data[layer][nn]
result.append([time, value])
elif isinstance(idx, list) and all(isinstance(x, int) for x in idx):
for i, time in enumerate(times):
data = self.get_data(totim=time)
row = [time]
lni = get_lni(ncpl, idx)
for layer, nn in lni:
value = data[layer][nn]
row += [value]
result.append(row)
else:
raise ValueError("idx must be an integer or a list of integers")
return np.array(result)
[docs] def get_alldata(self, mflay=None, nodata=-9999):
"""
Get all data from the USG head file.
Parameters
----------
mflay : integer
MODFLOW zero-based layer number to return. For USG files, this
parameter is required (cannot be None). (Default is None.)
nodata : float
The nodata value in the data array. All array values that have the
nodata value will be assigned np.nan.
Returns
-------
data : numpy array
Array has size (ntimes, ncells_in_layer) when mflay is specified.
Raises
------
NotImplementedError
Raised when mflay=None. USG head files contain ragged arrays with
variable-sized data per layer, which cannot be converted to a
uniform numpy array when retrieving all layers.
Notes
-----
For MODFLOW-USG files, the mflay parameter must be specified to retrieve
data for a single layer across all timesteps. To get all layers for a
specific timestep, use get_data() instead.
"""
if mflay is None:
raise NotImplementedError(
"get_alldata() with mflay=None is not supported for"
"MODFLOW-USG head files. These contain variably-size "
"data per layer which cannot be stacked into a numpy "
"array. Specify mflay to get data for a single layer "
"or use get_data() for specific timesteps."
)
return super().get_alldata(mflay=mflay, nodata=nodata)
[docs]class BudgetIndexError(Exception):
pass
[docs]class CellBudgetFile:
"""
The CellBudgetFile class provides convenient ways to retrieve and
manipulate budget data from a binary cell budget file. A utility
method is also provided to reverse the budget records for particle
tracking simulations in which particles are tracked backwards from
terminating to release locations (e.g., to compute capture zones).
Parameters
----------
filename : str or PathLike
Path of the cell budget file.
precision : string
Precision of floating point budget data in the file. Accepted
values are 'single' or 'double'. Default is 'single'.
verbose : bool
Toggle logging output. Default is False.
Examples
--------
>>> import flopy.utils.binaryfile as bf
>>> cbb = bf.CellBudgetFile('mymodel.cbb')
>>> cbb.headers
>>> rec = cbb.get_data(kstpkper=(0,0), text='RIVER LEAKAGE')
"""
def __init__(
self,
filename: Union[str, PathLike],
precision="auto",
verbose=False,
**kwargs,
):
self.filename = Path(filename).expanduser().absolute()
self.precision = precision
self.verbose = verbose
self.file = open(self.filename, "rb")
# Get filesize to ensure this is not an empty file
self.file.seek(0, 2)
totalbytes = self.file.tell()
self.file.seek(0, 0) # reset to beginning
assert self.file.tell() == 0
if totalbytes == 0:
raise ValueError(f"datafile error: file is empty: {filename}")
self.nrow = 0
self.ncol = 0
self.nlay = 0
self.nper = 0
self.times = []
self.kstpkper = []
self.recordarray = []
self.iposheader = []
self.iposarray = []
self.textlist = []
self.imethlist = []
self.paknamlist_from = []
self.paknamlist_to = []
self.compact = True # compact budget file flag
self.dis = None
self.modelgrid = None
if "model" in kwargs.keys():
self.model = kwargs.pop("model")
self.modelgrid = self.model.modelgrid
self.dis = self.model.dis
if "dis" in kwargs.keys():
self.dis = kwargs.pop("dis")
self.modelgrid = self.dis.parent.modelgrid
if "tdis" in kwargs.keys():
self.tdis = kwargs.pop("tdis")
if "modelgrid" in kwargs.keys():
self.modelgrid = kwargs.pop("modelgrid")
if len(kwargs.keys()) > 0:
args = ",".join(kwargs.keys())
raise ValueError(f"LayerFile error: unrecognized kwargs: {args}")
if precision == "auto":
success = self._set_precision("single")
if not success:
success = self._set_precision("double")
if not success:
s = "Budget precision could not be auto determined"
raise BudgetIndexError(s)
elif precision == "single":
success = self._set_precision(precision)
elif precision == "double":
success = self._set_precision(precision)
else:
raise ValueError(f"Unknown precision specified: {precision}")
# set shape for full3D option
if self.modelgrid is None:
self.shape = (self.nlay, self.nrow, self.ncol)
self.nnodes = self.nlay * self.nrow * self.ncol
else:
self.shape = self.modelgrid.shape
self.nnodes = self.modelgrid.nnodes
if not success:
raise ValueError(
f"Budget file could not be read using {precision} precision"
)
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
[docs] @classmethod
def write(
cls,
filename,
data,
text="FLOW-JA-FACE",
imeth=1,
precision="double",
delt=1.0,
pertim=None,
totim=None,
nlay=None,
nrow=None,
ncol=None,
ncpl=None,
nnodes=None,
kstpkper=None,
verbose=False,
):
"""
Write budget data directly to a binary file.
This classmethod writes budget data arrays to a binary cell budget
file and returns a CellBudgetFile instance with the file open.
Parameters
----------
filename : str or PathLike
Path for the output file
data : ndarray, dict, or list
Budget data in one of three formats:
1. Array with time dimension:
- Shape (ntimes, nlay, nrow, ncol) or (ntimes, ...) for grid data
- First dimension is time, creates one record per time step
- Requires kstpkper parameter or uses sequential (1,1), (1,2), (1,3), ...
2. Dict mapping (kstp, kper) tuples to arrays:
{(kstp, kper): array, ...}
- For imeth=1: arrays should be 1D (flattened cell-by-cell data)
3. List of dicts with full metadata:
[{'data': array, 'kstp': int, 'kper': int, 'text': str (optional),
'totim': float (optional), 'pertim': float (optional),
'delt': float (optional), 'imeth': int (optional)}, ...]
- Allows per-record customization of all parameters
text : str or list, default "FLOW-JA-FACE"
Budget text identifier (will be padded to 16 characters).
If list, must match length of data records.
imeth : int, default 1
Method code:
- 1: Full 3D array (most common)
- 6: List-based budget (for MF6 advanced packages)
precision : str, default "double"
Precision of floating point data: 'single' or 'double'
delt : float or dict, default 1.0
Time step length. Can be:
- float/int: Use same value for all records
- dict: Maps (kstp, kper) to delt values
pertim : float or dict, optional
Period time. Can be:
- float/int: Use same value for all records (only valid with
single timestep)
- dict: Maps (kstp, kper) to pertim values
- None: Defaults to totim
totim : float or dict, optional
Total simulation time. Can be:
- float/int: Use same value for all records (only valid with
single timestep)
- dict: Maps (kstp, kper) to totim values
- None: Defaults to sequential counter (1.0, 2.0, 3.0, ...)
nlay : int, optional
Number of layers (DIS, DISV). Inferred if None.
nrow : int, optional
Number of rows (DIS only). Inferred if None.
ncol : int, optional
Number of columns (DIS only). Inferred if None.
ncpl : int, optional
Number of cells per layer (DISV only). Inferred if None.
nnodes : int, optional
Total number of nodes (DISU only). Inferred if None.
kstpkper : list of tuples, optional
Time step/period mapping for array data with time dimension.
List of (kstp, kper) tuples, one per time step.
If None, uses sequential numbering: (1,1), (1,2), (1,3), ...
Ignored if data is dict or list format.
verbose : bool, default False
Print progress messages
Returns
-------
CellBudgetFile
Instance with the written file open
Notes
-----
Discretization types are determined by which parameters are provided:
- DIS (structured): nlay, nrow, ncol
- DISV (vertically staggered): nlay, ncpl
- DISU (unstructured): nnodes
If no dimensions are provided, they will be inferred from array shapes.
Examples
--------
>>> import numpy as np
>>> from flopy.utils import CellBudgetFile
>>>
>>> # Write budget data for two time steps
>>> flow1 = np.random.rand(6000)
>>> flow2 = np.random.rand(6000)
>>> data = {
... (1, 1): flow1,
... (1, 2): flow2,
... }
>>> cbb = CellBudgetFile.write(
... 'output.cbc', data, text='FLOW-JA-FACE', nlay=3, nrow=10, ncol=20
... )
>>>
>>> # Or with list of records (dicts)
>>> CellBudgetFile.write('output.cbc', [
... {'data': flow1, 'kstp': 1, 'kper': 1, 'totim': 10.0,
... 'text': 'FLOW-JA-FACE'},
... {'data': flow2, 'kstp': 1, 'kper': 2, 'totim': 20.0,
... 'text': 'FLOW-JA-FACE'},
... ], nlay=3, nrow=10, ncol=20)
"""
# xarray duck typing - extract underlying numpy array
if hasattr(data, "values") and hasattr(data, "dims"):
data = data.values
# Scalar handling - broadcast to shape
if isinstance(data, (int, float, np.number)):
# Determine shape from grid parameters
if nnodes is not None:
shape = (nnodes,)
elif ncpl is not None and nlay is not None:
shape = (nlay, ncpl)
elif nrow is not None and ncol is not None:
if nlay is not None:
shape = (nlay, nrow, ncol)
else:
shape = (nrow, ncol)
else:
raise ValueError(
"Must provide grid dimensions (nlay/nrow/ncol, ncpl, or nnodes) "
"when using scalar data"
)
# Default to single timestep if kstpkper not provided
if kstpkper is None:
kstpkper = [(1, 1)]
# Create array with time dimension
realtype = np.float32 if precision == "single" else np.float64
arr = np.full((len(kstpkper),) + shape, data, dtype=realtype)
data = arr
# List handling - convert list of arrays to array with time dimension
if isinstance(data, list):
if not data:
raise ValueError("Empty data list")
# Check if it's list of dicts (already supported) or list of arrays
if isinstance(data[0], dict):
# List of dicts - let existing code handle it
pass
else:
# List of arrays - convert to numpy array with time dimension
# First extract .values from any xarray elements
arrays = []
for elem in data:
if hasattr(elem, "values") and hasattr(elem, "dims"):
arrays.append(elem.values)
else:
arrays.append(elem)
try:
data = np.array(arrays)
except Exception as e:
raise ValueError(f"Could not convert list to array: {e}")
# Handle array with time dimension - convert to dict format
if isinstance(data, np.ndarray):
arr = data
ntimes = arr.shape[0]
# Generate or validate kstpkper
if kstpkper is None:
# Sequential stress periods: (1,1), (1,2), (1,3), ...
kstpkper = [(1, i) for i in range(1, ntimes + 1)]
elif len(kstpkper) != ntimes:
raise ValueError(
f"kstpkper must have {ntimes} entries to match "
f"time dimension, got {len(kstpkper)}"
)
# Convert to dict format
data = {}
for i, (kstp, kper) in enumerate(kstpkper):
data[(kstp, kper)] = arr[i]
# Normalize data to list of record dicts
if isinstance(data, dict):
# Validate that single-value times are only used with single timestep
if len(data) > 1:
if isinstance(totim, (int, float)):
raise ValueError(
"totim cannot be a single value when data has "
"multiple time steps. Use a dict mapping (kstp, "
"kper) to time values, or pass data with a single "
"time step."
)
if isinstance(pertim, (int, float)):
raise ValueError(
"pertim cannot be a single value when data has "
"multiple time steps. Use a dict mapping (kstp, "
"kper) to time values, or pass data with a single "
"time step."
)
if isinstance(delt, (int, float)) and delt != 1.0:
raise ValueError(
"delt cannot be a single non-default value when "
"data has multiple time steps. Use a dict mapping "
"(kstp, kper) to delt values, or pass data with a "
"single time step."
)
records = []
inferred_shape = None # Track shape from first shaped array
for i, ((kstp, kper), arr) in enumerate(sorted(data.items()), start=1):
# Handle xarray in dict values
if hasattr(arr, "values") and hasattr(arr, "dims"):
arr = arr.values
arr = np.asarray(arr)
# Capture shape before flattening for dimension inference
if arr.ndim in (2, 3) and inferred_shape is None:
inferred_shape = arr.shape
arr = arr.flatten()
# Get time values
if totim is None:
tot = float(i)
elif isinstance(totim, dict):
tot = totim.get((kstp, kper), float(i))
elif isinstance(totim, (int, float)):
tot = float(totim)
else:
raise ValueError("totim must be None, number, or dict")
if pertim is None:
per = tot
elif isinstance(pertim, dict):
per = pertim.get((kstp, kper), tot)
elif isinstance(pertim, (int, float)):
per = float(pertim)
else:
raise ValueError("pertim must be None, number, or dict")
if isinstance(delt, dict):
dt = delt.get((kstp, kper), 1.0)
elif isinstance(delt, (int, float)):
dt = float(delt)
else:
raise ValueError("delt must be number or dict")
# Get text for this record
if isinstance(text, list):
rec_text = text[len(records)]
else:
rec_text = text
records.append(
{
"data": arr,
"kstp": kstp,
"kper": kper,
"totim": tot,
"pertim": per,
"delt": dt,
"text": rec_text,
"imeth": imeth,
}
)
# Use inferred shape if dimensions not provided
if inferred_shape is not None:
# Determine if user is trying to use DISV (nlay + ncpl)
if ncpl is not None or (
nlay is not None and nrow is None and ncol is None
):
# DISV mode
if len(inferred_shape) == 2:
# 2D array: (nlay, ncpl)
nlay = nlay or inferred_shape[0]
ncpl = ncpl or inferred_shape[1]
elif len(inferred_shape) == 1:
# 1D array: assume nlay=1, infer ncpl
nlay = nlay or 1
ncpl = ncpl or inferred_shape[0]
elif nnodes is None:
# DIS mode (default)
if nlay is None or nrow is None or ncol is None:
if len(inferred_shape) == 3:
# 3D array: (nlay, nrow, ncol)
nlay = nlay or inferred_shape[0]
nrow = nrow or inferred_shape[1]
ncol = ncol or inferred_shape[2]
elif len(inferred_shape) == 2:
# 2D array: (nrow, ncol), assume nlay=1
nlay = nlay or 1
nrow = nrow or inferred_shape[0]
ncol = ncol or inferred_shape[1]
elif isinstance(data, list):
records = []
inferred_shape = None # Track shape from first shaped array
for rec in data:
rec_imeth = rec.get("imeth", imeth)
arr = rec["data"]
# Handle xarray in list elements
if hasattr(arr, "values") and hasattr(arr, "dims"):
arr = arr.values
arr = np.asarray(arr)
# For imeth=1, capture shape before flattening for dimension inference
if rec_imeth == 1:
rec_text = rec.get("text", text).strip()
# Only infer from non-FLOW-JA-FACE data (connection-based)
if rec_text != "FLOW-JA-FACE" and arr.ndim in (2, 3):
if inferred_shape is None:
inferred_shape = arr.shape
arr = arr.flatten()
records.append(
{
"data": arr,
"kstp": rec["kstp"],
"kper": rec["kper"],
"totim": rec.get("totim", float(rec["kper"])),
"pertim": rec.get(
"pertim", rec.get("totim", float(rec["kper"]))
),
"delt": rec.get("delt", 1.0),
"text": rec.get("text", text),
"imeth": rec_imeth,
"modelnam": rec.get("modelnam", ""),
"paknam": rec.get("paknam", ""),
"modelnam2": rec.get("modelnam2", ""),
"paknam2": rec.get("paknam2", ""),
}
)
# Use inferred shape if dimensions not provided
if inferred_shape is not None:
# Determine if user is trying to use DISV (nlay + ncpl)
if ncpl is not None or (
nlay is not None and nrow is None and ncol is None
):
# DISV mode
if len(inferred_shape) == 2:
# 2D array: (nlay, ncpl)
nlay = nlay or inferred_shape[0]
ncpl = ncpl or inferred_shape[1]
elif len(inferred_shape) == 1:
# 1D array: assume nlay=1, infer ncpl
nlay = nlay or 1
ncpl = ncpl or inferred_shape[0]
elif nnodes is None:
# DIS mode (default)
if nlay is None or nrow is None or ncol is None:
if len(inferred_shape) == 3:
# 3D array: (nlay, nrow, ncol)
nlay = nlay or inferred_shape[0]
nrow = nrow or inferred_shape[1]
ncol = ncol or inferred_shape[2]
elif len(inferred_shape) == 2:
# 2D array: (nrow, ncol), assume nlay=1
nlay = nlay or 1
nrow = nrow or inferred_shape[0]
ncol = ncol or inferred_shape[1]
else:
raise ValueError("data must be dict or list")
if len(records) == 0:
raise ValueError("No data records provided")
# Check supported imeth values
for rec in records:
if rec["imeth"] not in (1, 6):
raise NotImplementedError(
f"Only imeth=1 and imeth=6 are currently supported, "
f"got imeth={rec['imeth']}"
)
# Determine discretization type and calculate nnodes
# DIS: nlay, nrow, ncol
# DISV: nlay, ncpl
# DISU: nnodes
first_imeth = records[0]["imeth"]
# Determine discretization type
if nnodes is not None:
# DISU (unstructured)
dis_type = "DISU"
# For DISU header: (nnodes, 1, -1)
nlay = 1 # -nlay = -1 in header
nrow = 1
ncol = nnodes
elif ncpl is not None:
# DISV (vertically staggered)
dis_type = "DISV"
if nlay is None:
nlay = 1
# Calculate nnodes from nlay * ncpl
nnodes = nlay * ncpl
# For DISV, nrow is always 1 in the header
nrow = 1
# ncol is set to ncpl for header writing
ncol = ncpl
else:
# DIS (structured) - default
dis_type = "DIS"
if first_imeth == 1:
first_data = records[0]["data"]
first_text = records[0].get("text", text).strip()
# FLOW-JA-FACE is connection-based, not node-based
if first_text == "FLOW-JA-FACE":
# For FLOW-JA-FACE, dimensions must be provided
if nlay is None or nrow is None or ncol is None:
raise ValueError(
"For FLOW-JA-FACE data, dimensions must be provided"
)
nnodes = nlay * nrow * ncol
else:
# For regular node-based data, infer from data size
data_size = len(first_data)
# If all dimensions provided, validate them
if nlay is not None and nrow is not None and ncol is not None:
expected = nlay * nrow * ncol
if expected != data_size:
raise ValueError(
f"Dimensions don't match: nlay={nlay}, nrow={nrow}, "
f"ncol={ncol} gives {expected} nodes but "
f"data has {data_size}"
)
nnodes = data_size
else:
# Set defaults to make it work for common cases
nnodes = data_size
if nlay is None:
nlay = 1
if nrow is None:
nrow = 1
if ncol is None:
ncol = nnodes
else:
# For imeth=6, use defaults if not provided
if nlay is None:
nlay = 1
if nrow is None:
nrow = 1
if ncol is None:
ncol = 1
nnodes = nlay * nrow * ncol
# Set precision dtype
realtype = np.float32 if precision == "single" else np.float64
# Prepare dtypes for headers
h1dt = np.dtype(
[
("kstp", np.int32),
("kper", np.int32),
("text", "S16"),
("ncol", np.int32),
("nrow", np.int32),
("nlay", np.int32),
]
)
h2dt = np.dtype(
[
("imeth", np.int32),
("delt", realtype),
("pertim", realtype),
("totim", realtype),
]
)
# Helper function to pad text (using module-level helper)
pad_text = _pad_text_to_16
# Create temporary file if no filename provided
if filename is None:
fd, filename = tempfile.mkstemp(suffix=".cbc")
import os
os.close(fd)
# Write binary file
if verbose:
print(f"Writing binary budget file: {filename}")
print(f" Precision: {precision}")
print(f" Discretization: {dis_type}")
if dis_type == "DIS":
print(f" Dimensions: {nlay} layers, {nrow} rows, {ncol} columns")
elif dis_type == "DISV":
print(f" Dimensions: {nlay} layers, {ncpl} cells per layer")
elif dis_type == "DISU":
print(f" Dimensions: {nnodes} nodes (unstructured)")
print(f" Number of records: {len(records)}")
with open(filename, "wb") as f:
for rec in records:
# Pad text
text_bytes = pad_text(rec["text"])
rec_imeth = rec["imeth"]
if rec_imeth == 1:
# Write header 1 for full 3D array
# For FLOW-JA-FACE (connection-based): use (data_size, 1, -1)
# For other records (node-based): use (ncol, nrow, -nlay)
rec_text = rec["text"].strip()
if rec_text == "FLOW-JA-FACE":
# Connection-based data - use actual data size
h1 = np.array(
(
rec["kstp"],
rec["kper"],
text_bytes,
len(rec["data"]),
1,
-1,
),
dtype=h1dt,
)
else:
# Node-based data - use grid dimensions
h1 = np.array(
(rec["kstp"], rec["kper"], text_bytes, ncol, nrow, -nlay),
dtype=h1dt,
)
h1.tofile(f)
# Write header 2
h2 = np.array(
(
rec_imeth,
realtype(rec["delt"]),
realtype(rec["pertim"]),
realtype(rec["totim"]),
),
dtype=h2dt,
)
h2.tofile(f)
# Write data
arr = rec["data"].astype(realtype)
# Skip size validation for FLOW-JA-FACE (connection-based)
rec_text = rec["text"].strip()
if rec_text != "FLOW-JA-FACE" and len(arr) != nnodes:
raise ValueError(
f"Inconsistent data sizes: expected {nnodes}, "
f"got {len(arr)}"
)
arr.tofile(f)
elif rec_imeth == 6:
# Write header 1 for list-based data
# For imeth=6, use ncol=1, nrow=1, nlay=-1
h1 = np.array(
(rec["kstp"], rec["kper"], text_bytes, 1, 1, -1),
dtype=h1dt,
)
h1.tofile(f)
# Write header 2
h2 = np.array(
(
rec_imeth,
realtype(rec["delt"]),
realtype(rec["pertim"]),
realtype(rec["totim"]),
),
dtype=h2dt,
)
h2.tofile(f)
# Write modelnam, paknam, modelnam2, paknam2
# Use same defaults as binary_util.py write_budget function
defaults = {
"modelnam": "",
"paknam": "",
"modelnam2": "",
"paknam2": "",
}
for name in ["modelnam", "paknam", "modelnam2", "paknam2"]:
name_bytes = pad_text(rec.get(name, defaults[name]))
f.write(name_bytes)
# Get data and determine columns
arr = rec["data"]
if not isinstance(arr, np.ndarray) or arr.dtype.names is None:
raise ValueError(
"For imeth=6, data must be a structured numpy array "
"with named fields"
)
# Calculate ndat (number of floating point columns)
# Expecting fields like: ID1, ID2, FLOW, [aux1, aux2, ...]
colnames = arr.dtype.names
ndat = len(colnames) - 2 # Exclude ID1, ID2
# Write ndat
np.array([ndat], dtype=np.int32).tofile(f)
# Write auxiliary column names (if any)
naux = ndat - 1 # Exclude FLOW
if naux > 0:
aux_names = colnames[3:] # Skip ID1, ID2, FLOW
for aux_name in aux_names:
# Auxiliary names must be space-padded (not null-padded)
# to match MODFLOW 6 expectations
aux_str = (
aux_name
if isinstance(aux_name, str)
else aux_name.decode("ascii")
)
aux_bytes = f"{aux_str:16}".encode("ascii")
f.write(aux_bytes)
# Write nlist
nlist = arr.shape[0]
np.array([nlist], dtype=np.int32).tofile(f)
# Write data - need to ensure correct dtypes
# Convert to proper precision for floating point fields
dt_list = []
for i, name in enumerate(colnames):
if i < 2: # ID1, ID2
dt_list.append((name, np.int32))
else: # FLOW and auxiliary variables
dt_list.append((name, realtype))
# Convert data to correct dtype
arr_typed = np.empty(nlist, dtype=np.dtype(dt_list))
for name in colnames:
arr_typed[name] = arr[name]
arr_typed.tofile(f)
# Explicitly flush to ensure data is written
f.flush()
import os
os.fsync(f.fileno())
# Return an instance with the file open
return cls(filename, precision=precision, verbose=verbose)
def __len__(self):
"""
Return the number of records (headers) in the file.
"""
return len(self.recordarray)
@property
def nrecords(self):
"""
Return the number of records (headers) in the file.
.. deprecated:: 3.8.0
Use :meth:`len` instead.
"""
warnings.warn(
"obj.nrecords is deprecated; use len(obj) instead.",
DeprecationWarning,
)
return len(self)
def __reset(self):
"""
Reset indexing lists when determining precision
"""
self.file.seek(0, 0)
self.times = []
self.kstpkper = []
self.recordarray = []
self.iposheader = []
self.iposarray = []
self.textlist = []
self.imethlist = []
self.paknamlist_from = []
self.paknamlist_to = []
def _set_precision(self, precision="single"):
"""
Method to set the budget precision from a CBC file. Enables
Auto precision code to work
Parameters
----------
precision : str
budget file precision (accepts 'single' or 'double')
"""
success = True
h1dt = [
("kstp", "i4"),
("kper", "i4"),
("text", "S16"),
("ncol", "i4"),
("nrow", "i4"),
("nlay", "i4"),
]
if precision == "single":
self.realtype = np.float32
ffmt = "f4"
else:
self.realtype = np.float64
ffmt = "f8"
h2dt0 = [
("imeth", "i4"),
("delt", ffmt),
("pertim", ffmt),
("totim", ffmt),
]
h2dt = [
("imeth", "i4"),
("delt", ffmt),
("pertim", ffmt),
("totim", ffmt),
("modelnam", "S16"),
("paknam", "S16"),
("modelnam2", "S16"),
("paknam2", "S16"),
]
self.header1_dtype = np.dtype(h1dt)
self.header2_dtype0 = np.dtype(h2dt0)
self.header2_dtype = np.dtype(h2dt)
hdt = h1dt + h2dt
self.header_dtype = np.dtype(hdt)
try:
self._build_index()
except (BudgetIndexError, EOFError) as e:
success = False
self.__reset()
return success
def _totim_from_kstpkper(self, kstpkper):
if self.dis is None:
return -1.0
kstp, kper = kstpkper
perlen = self.dis.perlen.array
nstp = self.dis.nstp.array[kper]
tsmult = self.dis.tsmult.array[kper]
kper_len = np.sum(perlen[:kper])
this_perlen = perlen[kper]
if tsmult == 1:
dt1 = this_perlen / float(nstp)
else:
dt1 = this_perlen * (tsmult - 1.0) / ((tsmult**nstp) - 1.0)
kstp_len = [dt1]
for i in range(kstp + 1):
kstp_len.append(kstp_len[-1] * tsmult)
kstp_len = sum(kstp_len[: kstp + 1])
return kper_len + kstp_len
def _build_index(self):
"""
Build the ordered dictionary, which maps the header information
to the position in the binary file.
"""
# read first record
header = self._get_header()
nrow = header["nrow"]
ncol = header["ncol"]
text = header["text"].decode("ascii").strip()
if nrow < 0 or ncol < 0:
raise ValueError("negative nrow, ncol")
if text != "FLOW-JA-FACE":
self.nrow = nrow
self.ncol = ncol
self.nlay = np.abs(header["nlay"])
self.file.seek(0, 2)
self.totalbytes = self.file.tell()
self.file.seek(0, 0)
self.recorddict = {}
# read the remaining records
ipos = 0
while ipos < self.totalbytes:
self.iposheader.append(ipos)
header = self._get_header()
totim = header["totim"]
# if old-style (non-compact) file,
# compute totim from kstp and kper
if not self.compact:
totim = self._totim_from_kstpkper(
(header["kstp"] - 1, header["kper"] - 1)
)
header["totim"] = totim
if totim >= 0 and totim not in self.times:
self.times.append(totim)
kstpkper = (header["kstp"], header["kper"])
if kstpkper not in self.kstpkper:
self.kstpkper.append(kstpkper)
if header["text"] not in self.textlist:
# check the precision of the file using text records
tlist = [header["text"], header["modelnam"]]
for text in tlist:
if len(text) == 0:
continue
charbytes = list(text)
if min(charbytes) < 32 or max(charbytes) > 126:
# not in conventional ASCII range
raise BudgetIndexError("Improper precision")
self.textlist.append(header["text"])
self.imethlist.append(header["imeth"])
if header["paknam"] not in self.paknamlist_from:
self.paknamlist_from.append(header["paknam"])
if header["paknam2"] not in self.paknamlist_to:
self.paknamlist_to.append(header["paknam2"])
ipos = self.file.tell()
if self.verbose:
for itxt in [
"kstp",
"kper",
"text",
"ncol",
"nrow",
"nlay",
"imeth",
"delt",
"pertim",
"totim",
"modelnam",
"paknam",
"modelnam2",
"paknam2",
]:
s = header[itxt]
print(f"{itxt}: {s}")
print("file position: ", ipos)
if header["imeth"].item() not in {5, 6, 7}:
print("")
# set the nrow, ncol, and nlay if they have not been set
if self.nrow == 0:
text = header["text"].decode("ascii").strip()
if text != "FLOW-JA-FACE":
self.nrow = header["nrow"]
self.ncol = header["ncol"]
self.nlay = np.abs(header["nlay"])
# store record and byte position mapping
self.recorddict[tuple(header)] = (
ipos # store the position right after header2
)
self.recordarray.append(header)
self.iposarray.append(ipos) # store the position right after header2
# skip over the data to the next record and set ipos
self._skip_record(header)
ipos = self.file.tell()
# convert to numpy arrays
self.recordarray = np.array(self.recordarray, dtype=self.header_dtype)
self.iposheader = np.array(self.iposheader, dtype=np.int64)
self.iposarray = np.array(self.iposarray, dtype=np.int64)
self.nper = self.recordarray["kper"].max()
# provide headers as a pandas frame
self.headers = pd.DataFrame(self.recordarray, index=self.iposarray)
# remove irrelevant columns
cols = self.headers.columns.to_list()
unique_imeth = self.headers["imeth"].unique()
if unique_imeth.max() == 0:
drop_cols = cols[cols.index("imeth") :]
elif 6 not in unique_imeth:
drop_cols = cols[cols.index("modelnam") :]
else:
drop_cols = []
if drop_cols:
self.headers.drop(columns=drop_cols, inplace=True)
for name in self.headers.columns:
dtype = self.header_dtype[name]
if np.issubdtype(dtype, bytes): # convert to str
self.headers[name] = (
self.headers[name].str.decode("ascii", "strict").str.strip()
)
def _skip_record(self, header):
"""
Skip over this record, not counting header and header2.
"""
nlay = abs(header["nlay"])
nrow = header["nrow"]
ncol = header["ncol"]
imeth = header["imeth"]
realtype_nbytes = self.realtype(1).nbytes
if imeth == 0:
nbytes = nrow * ncol * nlay * realtype_nbytes
elif imeth == 1:
nbytes = nrow * ncol * nlay * realtype_nbytes
elif imeth == 2:
nlist = binaryread(self.file, np.int32)[0]
nbytes = nlist * (4 + realtype_nbytes)
elif imeth == 3:
nbytes = nrow * ncol * realtype_nbytes + (nrow * ncol * 4)
elif imeth == 4:
nbytes = nrow * ncol * realtype_nbytes
elif imeth == 5:
nauxp1 = binaryread(self.file, np.int32)[0]
naux = nauxp1 - 1
naux_nbytes = naux * 16
if naux_nbytes:
check = self.file.seek(naux_nbytes, 1)
if check < naux_nbytes:
raise EOFError
nlist = binaryread(self.file, np.int32)[0]
if self.verbose:
print("naux: ", naux)
print("nlist: ", nlist)
print("")
nbytes = nlist * (4 + realtype_nbytes + naux * realtype_nbytes)
elif imeth == 6:
# read rest of list data
nauxp1 = binaryread(self.file, np.int32)[0]
naux = nauxp1 - 1
naux_nbytes = naux * 16
if naux_nbytes:
check = self.file.seek(naux_nbytes, 1)
if check < naux_nbytes:
raise EOFError
nlist = binaryread(self.file, np.int32)[0]
if self.verbose:
print("naux: ", naux)
print("nlist: ", nlist)
print("")
nbytes = nlist * (4 * 2 + realtype_nbytes + naux * realtype_nbytes)
else:
raise ValueError(f"invalid method code {imeth}")
if nbytes != 0:
self.file.seek(nbytes, 1)
def _get_header(self):
"""
Read the file header
"""
header1 = binaryread(self.file, self.header1_dtype, (1,))
nlay = header1["nlay"]
self.compact = bool(nlay < 0)
if self.compact:
# fill header2 by first reading imeth, delt, pertim and totim
# and then adding modelnames and paknames if imeth = 6
temp = binaryread(self.file, self.header2_dtype0, (1,))
header2 = np.array(
[(0, 0.0, 0.0, 0.0, "", "", "", "")], dtype=self.header2_dtype
)
for name in temp.dtype.names:
header2[name] = temp[name]
if header2["imeth"].item() == 6:
header2["modelnam"] = binaryread(self.file, bytes, charlen=16)
header2["paknam"] = binaryread(self.file, bytes, charlen=16)
header2["modelnam2"] = binaryread(self.file, bytes, charlen=16)
header2["paknam2"] = binaryread(self.file, bytes, charlen=16)
else:
header2 = np.array(
[(0, 0.0, 0.0, 0.0, "", "", "", "")], dtype=self.header2_dtype
)
fullheader = join_struct_arrays([header1, header2])
return fullheader[0]
def _find_text(self, text):
"""
Determine if selected record name is in budget file
"""
# check and make sure that text is in file
text16 = None
if text is not None:
if isinstance(text, bytes):
ttext = text.decode()
else:
ttext = text
for t in self.textlist:
if ttext.upper() in t.decode():
text16 = t
break
if text16 is None:
raise ValueError("The specified text string is not in the budget file")
return text16
def _find_paknam(self, paknam, to=False):
"""
Determine if selected record name is in budget file
"""
# check and make sure that text is in file
paknam16 = None
if paknam is not None:
if isinstance(paknam, bytes):
tpaknam = paknam.decode()
else:
tpaknam = paknam
for t in self._unique_package_names(to):
if tpaknam.upper() in t.decode():
paknam16 = t
break
if paknam16 is None:
raise ValueError(
"The specified package name string is not in the budget file"
)
return paknam16
[docs] def list_records(self):
"""
Print a list of all of the records in the file
.. deprecated:: 3.8.0
Use :attr:`headers` instead.
"""
warnings.warn(
"list_records() is deprecated; use headers instead.",
DeprecationWarning,
)
for rec in self.recordarray:
if isinstance(rec, bytes):
rec = rec.decode()
print(rec)
[docs] def list_unique_records(self):
"""
Print a list of unique record names
.. deprecated:: 3.8.0
Use `headers[["text", "imeth"]].drop_duplicates()` instead.
"""
warnings.warn(
"list_unique_records() is deprecated; use "
'headers[["text", "imeth"]].drop_duplicates() instead.',
DeprecationWarning,
)
print("RECORD IMETH")
print(22 * "-")
for rec, imeth in zip(self.textlist, self.imethlist):
if isinstance(rec, bytes):
rec = rec.decode()
print(f"{rec.strip():16} {imeth:5d}")
[docs] def list_unique_packages(self, to=False):
"""
Print a list of unique package names
.. deprecated:: 3.8.0
Use `headers.paknam.drop_duplicates()` or
`headers.paknam2.drop_duplicates()` instead.
"""
warnings.warn(
"list_unique_packages() is deprecated; use "
"headers.paknam.drop_duplicates() or "
"headers.paknam2.drop_duplicates() instead",
DeprecationWarning,
)
for rec in self._unique_package_names(to):
if isinstance(rec, bytes):
rec = rec.decode()
print(rec)
[docs] def get_unique_record_names(self, decode=False):
"""
Get a list of unique record names in the file
Parameters
----------
decode : bool
Optional boolean used to decode byte strings (default is False).
Returns
-------
names : list of strings
List of unique text names in the binary file.
"""
if decode:
names = []
for text in self.textlist:
if isinstance(text, bytes):
text = text.decode()
names.append(text)
else:
names = self.textlist
return names
[docs] def get_unique_package_names(self, decode=False, to=False):
"""
Get a list of unique package names in the file
Parameters
----------
decode : bool
Optional boolean used to decode byte strings (default is False).
Returns
-------
names : list of strings
List of unique package names in the binary file.
"""
if decode:
names = []
for text in self._unique_package_names(to):
if isinstance(text, bytes):
text = text.decode()
names.append(text)
else:
names = self._unique_package_names(to)
return names
def _unique_package_names(self, to=False):
"""
Get a list of unique package names in the file
Returns
-------
out : list of strings
List of unique package names in the binary file.
"""
return self.paknamlist_to if to else self.paknamlist_from
[docs] def get_kstpkper(self):
"""
Get a list of unique tuples (stress period, time step) in the file.
Indices are 0-based, use the `kstpkper` attribute for 1-based.
Returns
-------
list of (kstp, kper) tuples
List of unique combinations of stress period &
time step indices (0-based) in the binary file
"""
return [(kstp - 1, kper - 1) for kstp, kper in self.kstpkper]
[docs] def get_indices(self, text=None):
"""
Get a list of indices for a selected record name
Parameters
----------
text : str
The text identifier for the record. Examples include
'RIVER LEAKAGE', 'STORAGE', 'FLOW RIGHT FACE', etc.
Returns
-------
out : tuple
indices of selected record name in budget file.
"""
# check and make sure that text is in file
if text is not None:
text16 = self._find_text(text)
select_indices = np.asarray(self.recordarray["text"] == text16).nonzero()
if isinstance(select_indices, tuple):
select_indices = select_indices[0]
else:
select_indices = None
return select_indices
[docs] def get_position(self, idx, header=False):
"""
Get the starting position of the data or header for a specified record
number in the binary budget file.
Parameters
----------
idx : int
The zero-based record number. The first record is record 0.
header : bool
If True, the position of the start of the header data is returned.
If False, the position of the start of the data is returned
(default is False).
Returns
-------
ipos : int64
The position of the start of the data in the cell budget file
or the start of the header.
"""
if header:
ipos = self.iposheader[idx]
else:
ipos = self.iposarray[idx]
return ipos
[docs] def get_data(
self,
idx=None,
kstpkper=None,
totim=None,
text=None,
paknam=None,
paknam2=None,
full3D=False,
) -> Union[list, np.ndarray]:
"""
Get data from the binary budget file.
Parameters
----------
idx : int or list
The zero-based record number. The first record is record 0.
kstpkper : tuple of ints
A tuple containing the time step and stress period (kstp, kper).
The kstp and kper values are zero based.
totim : float
The simulation time.
text : str
The text identifier for the record. Examples include
'RIVER LEAKAGE', 'STORAGE', 'FLOW RIGHT FACE', etc.
paknam : str
The `from` package name for the record.
paknam2 : str
The `to` package name for the record. This argument can be
useful for MODFLOW 6 budget files if multiple packages of
the same type are specified. The paknam2 argument can be
specified as the package name (not the package type) in
order to retrieve budget data for a specific named package.
full3D : boolean
If true, then return the record as a three dimensional numpy
array, even for those list-style records written as part of a
'COMPACT BUDGET' MODFLOW budget file. (Default is False.)
Returns
-------
recordlist : list of records
A list of budget objects. The structure of the returned object
depends on the structure of the data in the cbb file.
If full3D is True, then this method will return a numpy masked
array of size (nlay, nrow, ncol) for those list-style
'COMPACT BUDGET' records written by MODFLOW.
See Also
--------
Notes
-----
Examples
--------
"""
# trap for totim error
if totim is not None:
if len(self.times) == 0:
errmsg = """This is an older style budget file that
does not have times in it. Use the MODFLOW
compact budget format if you want to work with
times. Or you may access this file using the
kstp and kper arguments or the idx argument."""
raise ValueError(errmsg)
# check and make sure that text is in file
text16 = None
if text is not None:
text16 = self._find_text(text)
paknam16 = None
if paknam is not None:
paknam16 = self._find_paknam(paknam)
paknam16_2 = None
if paknam2 is not None:
paknam16_2 = self._find_paknam(paknam2, to=True)
# build the selection mask
select_indices = np.array([True] * len(self.recordarray))
selected = False
if idx is not None:
select_indices[idx] = False
select_indices = ~select_indices
selected = True
if kstpkper is not None:
kstp1 = kstpkper[0] + 1
kper1 = kstpkper[1] + 1
select_indices = select_indices & (self.recordarray["kstp"] == kstp1)
select_indices = select_indices & (self.recordarray["kper"] == kper1)
selected = True
if text16 is not None:
select_indices = select_indices & (self.recordarray["text"] == text16)
selected = True
if paknam16 is not None:
select_indices = select_indices & (self.recordarray["paknam"] == paknam16)
selected = True
if paknam16_2 is not None:
select_indices = select_indices & (
self.recordarray["paknam2"] == paknam16_2
)
selected = True
if totim is not None:
select_indices = select_indices & np.isclose(
self.recordarray["totim"], totim
)
selected = True
if not selected:
raise TypeError(
"get_data() missing 1 required argument: 'kstpkper', 'totim', "
"'idx', or 'text'"
)
return [
self.get_record(idx, full3D=full3D)
for idx, t in enumerate(select_indices)
if t
]
[docs] def to_geodataframe(
self,
gdf=None,
modelgrid=None,
idx=None,
kstpkper=None,
totim=None,
text=None,
):
if (idx, kstpkper, totim) == (None, None, None):
raise AssertionError(
"to_geodataframe() missing 1 required argument: "
"please provide 'idx', 'kstpkper', or 'totim'"
)
if gdf is None:
if modelgrid is None:
if self.modelgrid is None:
raise AssertionError(
"A geodataframe or modelgrid instance must be supplied"
)
modelgrid = self.modelgrid
gdf = modelgrid.to_geodataframe()
col_names = []
if text is None:
textlist = [i.decode() for i in self.textlist]
for text in textlist:
data = self.get_data(
idx=idx, kstpkper=kstpkper, totim=totim, text=text, full3D=True
)
text = text.strip().lower().replace(" ", "_")
for ix, arr in enumerate(data[0]):
name = f"{text}_{ix}"
gdf[name] = arr.ravel()
col_names.append(name)
else:
data = self.get_data(
idx=idx, kstpkper=kstpkper, totim=totim, text=text, full3D=True
)
text = text.strip().lower().replace(" ", "_")
for ix, arr in enumerate(data[0]):
name = f"{text}_{ix}"
gdf[name] = arr.ravel()
col_names.append(name)
return gdf
[docs] def get_ts(self, idx, text=None, times=None, variable="q"):
"""
Get a time series from the binary budget file.
Parameters
----------
idx : int, tuple of ints, or list of such
Acceptable values depend on grid type:
- Structured grids (DIS): (layer, row, column) or list of such
- Vertex grids (DISV): (layer, cellid) or list of such
- Unstructured grids (DISU): node number or list of such
All indices must be zero-based.
For backwards compatibility, DISV and DISU grids also accept the old
3-tuple format with dummy values: (layer, dummy, cellid) for DISV and
(dummy, dummy, node) for DISU.
text : str
The text identifier for the record. Examples include
'RIVER LEAKAGE', 'STORAGE', 'FLOW RIGHT FACE', etc.
times : iterable of floats
List of times to from which to get time series.
variable : str, optional
Variable name to extract from the budget record. Default is 'q'.
For records with auxiliary variables (e.g., 'DATA-SPDIS', 'DATA-SAT'),
this can be used to access auxiliary data. Examples include 'qx',
'qy', 'qz' for specific discharge components.
Returns
-------
out : numpy array
Array has size (ntimes, ncells + 1). The first column in the
data array will contain time (totim).
Notes
-----
Index ranges (zero-based):
- DIS: 0 <= layer < nlay, 0 <= row < nrow, 0 <= col < ncol
- DISV: 0 <= layer < nlay, 0 <= cellid < ncpl
- DISU: 0 <= node < nnodes
Examples
--------
>>> # DIS grid: layer 0, row 5, column 5
>>> ts = cbb.get_ts(idx=(0, 5, 5), text='DATA-SPDIS', variable='qx')
>>> # DISV grid: layer 0, cell 12
>>> ts = cbb.get_ts(idx=(0, 12), text='DATA-SPDIS', variable='qx')
>>> # DISU grid: node 10
>>> ts = cbb.get_ts(idx=10, text='FLOW-JA-FACE')
"""
if text is None:
raise ValueError(
"text keyword must be provided to CellBudgetFile get_ts() method."
)
cellids = self._cellids(idx)
ncells = len(cellids)
result = np.empty((len(self.kstpkper), ncells + 1), dtype=self.realtype)
result[:, :] = np.nan
if len(self.times) == result.shape[0]:
result[:, 0] = np.array(self.times)
timesint = self.get_times()
kstpkper = self.get_kstpkper()
nsteps = len(kstpkper)
if len(timesint) < 1:
if times is None:
timesint = [x + 1 for x in range(nsteps)]
else:
if isinstance(times, np.ndarray):
times = times.tolist()
if len(times) != nsteps:
raise ValueError(
f"number of times provided ({len(times)}) must equal "
f"number of time steps in cell budget file ({nsteps})"
)
timesint = times
for i, t in enumerate(timesint):
result[i, 0] = t
use_full3d = variable == "q" and (
self.modelgrid is None or self.modelgrid.grid_type == "structured"
)
for itim, kstpkper_ in enumerate(kstpkper):
if use_full3d:
v = self.get_data(kstpkper=kstpkper_, text=text, full3D=True)
if len(v) == 0:
continue
v = v[0]
istat = 1
for k, i, j in cellids:
result[itim, istat] = v[k, i, j].copy()
istat += 1
else:
v = self.get_data(kstpkper=kstpkper_, text=text)
if len(v) == 0:
continue
if self.modelgrid is None:
raise ValueError(
"A modelgrid instance must be provided during "
"instantiation to get IMETH=6 timeseries data"
)
nodes = self._cellid_to_node(cellids)
for vv in v:
# Check if this is a recarray (IMETH=6) or plain array (IMETH=1)
if vv.dtype.names is None:
# IMETH=1: Plain array - extract values at specified cells
# MODFLOW 6 stores IMETH=1 data as 3D arrays with padding:
# - DIS: (nlay, nrow, ncol) - natural 3D structure
# - DISV: (nlay, 1, ncpl) - middle dim is padding
# - DISU: (1, 1, nnodes) - first two dims are padding
if self.modelgrid.grid_type == "vertex":
# DISV: shape is (nlay, 1, ncpl)
# Extract as vv[k, 0, cellid] to handle padding dimension
istat = 1
for k, cell in cellids:
result[itim, istat] = vv[k, 0, cell].copy()
istat += 1
else:
# DISU: shape is (1, 1, nnodes)
# Extract as vv[0, 0, node] to handle padding dimensions
istat = 1
for node in cellids:
result[itim, istat] = vv[0, 0, node].copy()
istat += 1
else:
# IMETH=6: Recarray with named fields
available = vv.dtype.names
if variable not in available:
raise ValueError(
f"Variable '{variable}' not found in budget record. "
f"Available variables: {list(available)}"
)
dix = np.asarray(np.isin(vv["node"], nodes)).nonzero()[0]
if len(dix) > 0:
result[itim, 1:] = vv[variable][dix]
return result
def _cellids(self, idx):
if isinstance(idx, int):
idx_list = [idx]
elif isinstance(idx, tuple):
idx_list = [idx]
elif isinstance(idx, list):
idx_list = idx
else:
raise TypeError(
f"Invalid index type, expected int, tuple "
f"or list of such, got {type(idx)}"
)
grid_type = "structured" if self.modelgrid is None else self.modelgrid.grid_type
cellid = []
for item in idx_list:
if grid_type == "structured":
if not isinstance(item, (list, tuple)) or len(item) != 3:
raise ValueError(
f"Expected DIS cell index (layer, row, col), got: {item}"
)
k, i, j = item
if k < 0 or k >= self.nlay:
raise ValueError(f"Layer index {k} out of range [0, {self.nlay})")
if i < 0 or i >= self.nrow:
raise ValueError(f"Row index {i} out of range [0, {self.nrow})")
if j < 0 or j >= self.ncol:
raise ValueError(f"Column index {j} out of range [0, {self.ncol})")
cellid.append((k, i, j))
elif grid_type == "vertex":
if isinstance(item, (list, tuple)):
if len(item) == 2:
# proper format: (layer, cellid)
k, cell = item
elif len(item) == 3:
# old format: (layer, dummy, cellid)
k, cell = item[0], item[2]
else:
raise ValueError(
f"Expected DISV cell index (layer, cellid) "
f"or (layer, dummy, cellid), got: {item}"
)
else:
raise ValueError(
f"Expected DISV cell index (layer, cellid) or "
f"(layer, dummy, cellid), got: {item}"
)
if k < 0 or k >= self.nlay:
raise ValueError(f"Layer index {k} out of range [0, {self.nlay})")
if cell < 0 or cell >= self.modelgrid.ncpl:
raise ValueError(
f"Cell index {cell} out of range [0, {self.modelgrid.ncpl})"
)
cellid.append((k, cell))
else:
if isinstance(item, (int, np.integer)):
# proper format: just node number
node = int(item)
elif isinstance(item, (list, tuple)):
if len(item) == 3:
# old format: (dummy, dummy, node)
node = int(item[2])
elif len(item) == 1:
# Also support single-element tuple
node = int(item[0])
else:
raise ValueError(
f"Expected DISU node number or (dummy, dummy, node), "
f"got: {item}"
)
else:
raise ValueError(
f"Expected DISU node number or (dummy, dummy, node), "
f"got: {item}"
)
if node < 0 or node >= self.modelgrid.nnodes:
raise ValueError(
f"Node index {node} out of range [0, {self.modelgrid.nnodes})"
)
cellid.append(node)
return cellid
def _cellid_to_node(self, cellids) -> list[int]:
"""Convert 0-based cellids to 1-based MODFLOW node numbers."""
# Get 0-based nodes from grid, then convert to 1-based (vectorized)
# UnstructuredGrid.get_node() accepts both plain ints and tuples
nodes_0based = self.modelgrid.get_node(cellids)
return (np.array(nodes_0based) + 1).tolist()
[docs] def get_record(self, idx, full3D=False):
"""
Get a single data record from the budget file.
Parameters
----------
idx : int
The zero-based record number. The first record is record 0.
full3D : boolean
If true, then return the record as a three dimensional numpy
array, even for those list-style records written as part of a
'COMPACT BUDGET' MODFLOW budget file. (Default is False.)
Returns
-------
record : a single data record
The structure of the returned object depends on the structure of
the data in the cbb file. Compact list data are returned as
If full3D is True, then this method will return a numpy masked
array of size (nlay, nrow, ncol) for those list-style
'COMPACT BUDGET' records written by MODFLOW.
See Also
--------
Notes
-----
Examples
--------
"""
# idx must be an ndarray, so if it comes in as an integer then convert
if np.isscalar(idx):
idx = np.array([idx])
header = self.recordarray[idx]
ipos = self.iposarray[idx].item()
self.file.seek(ipos, 0)
imeth = header["imeth"][0]
t = header["text"][0].decode("ascii")
s = f"Returning {t.strip()} as "
nlay = abs(header["nlay"][0])
nrow = header["nrow"][0]
ncol = header["ncol"][0]
# default method
if imeth == 0:
if self.verbose:
s += f"an array of shape {(nlay, nrow, ncol)}"
print(s)
return binaryread(self.file, self.realtype(1), shape=(nlay, nrow, ncol))
# imeth 1
elif imeth == 1:
if self.verbose:
s += f"an array of shape {(nlay, nrow, ncol)}"
print(s)
return binaryread(self.file, self.realtype(1), shape=(nlay, nrow, ncol))
# imeth 2
elif imeth == 2:
nlist = binaryread(self.file, np.int32)[0]
dtype = np.dtype([("node", np.int32), ("q", self.realtype)])
if self.verbose:
if full3D:
s += f"a numpy masked array of size ({nlay}, {nrow}, {ncol})"
else:
s += f"a numpy recarray of size ({nlist}, 2)"
print(s)
data = binaryread(self.file, dtype, shape=(nlist,))
if full3D:
return self.__create3D(data)
else:
return data.view(np.recarray)
# imeth 3
elif imeth == 3:
ilayer = binaryread(self.file, np.int32, shape=(nrow, ncol))
data = binaryread(self.file, self.realtype(1), shape=(nrow, ncol))
if self.verbose:
if full3D:
s += f"a numpy masked array of size ({nlay}, {nrow}, {ncol})"
else:
s += (
"a list of two 2D numpy arrays. The first is an "
f"integer layer array of shape ({nrow}, {ncol}). The "
f"second is real data array of shape ({nrow}, {ncol})"
)
print(s)
if full3D:
out = np.ma.zeros(self.nnodes, dtype=np.float32)
out.mask = True
vertical_layer = ilayer.flatten() - 1
# create the 2D cell index and then move it to
# the correct vertical location
idx = np.arange(0, vertical_layer.shape[0])
idx += vertical_layer * nrow * ncol
out[idx] = data.flatten()
return out.reshape(self.shape)
else:
return [ilayer, data]
# imeth 4
elif imeth == 4:
if self.verbose:
s += f"a 2d numpy array of size ({nrow}, {ncol})"
print(s)
return binaryread(self.file, self.realtype(1), shape=(nrow, ncol))
# imeth 5
elif imeth == 5:
nauxp1 = binaryread(self.file, np.int32)[0]
naux = nauxp1 - 1
l = [("node", np.int32), ("q", self.realtype)]
for i in range(naux):
auxname = binaryread(self.file, bytes, charlen=16)
l.append((auxname.decode("ascii").strip(), self.realtype))
dtype = np.dtype(l)
nlist = binaryread(self.file, np.int32)[0]
data = binaryread(self.file, dtype, shape=(nlist,))
if full3D:
if self.verbose:
s += f"a list array of shape ({nlay}, {nrow}, {ncol})"
print(s)
return self.__create3D(data)
else:
if self.verbose:
s += f"a numpy recarray of size ({nlist}, {2 + naux})"
print(s)
return data.view(np.recarray)
# imeth 6
elif imeth == 6:
# read rest of list data
nauxp1 = binaryread(self.file, np.int32)[0]
naux = nauxp1 - 1
l = [("node", np.int32), ("node2", np.int32), ("q", self.realtype)]
for i in range(naux):
auxname = binaryread(self.file, bytes, charlen=16)
l.append((auxname.decode("ascii").strip(), self.realtype))
dtype = np.dtype(l)
nlist = binaryread(self.file, np.int32)[0]
data = binaryread(self.file, dtype, shape=(nlist,))
if self.verbose:
if full3D:
s += f"a list array of shape ({nlay}, {nrow}, {ncol})"
else:
s += f"a numpy recarray of size ({nlist}, 2)"
print(s)
if full3D:
data = self.__create3D(data)
if self.modelgrid is not None:
return np.reshape(data, self.shape)
else:
return data
else:
return data.view(np.recarray)
else:
raise ValueError(f"invalid imeth value - {imeth}")
# should not reach this point
return
def __create3D(self, data):
"""
Convert a dictionary of {node: q, ...} into a numpy masked array.
Used to create full grid arrays when the full3D keyword is set
to True in get_data.
Parameters
----------
data : dictionary
Dictionary with node keywords and flows (q) items.
Returns
-------
out : numpy masked array
List contains unique simulation times (totim) in binary file.
"""
out = np.ma.zeros(self.nnodes, dtype=data["q"].dtype)
out.mask = True
for [node, q] in zip(data["node"], data["q"]):
idx = node - 1
out.data[idx] += q
out.mask[idx] = False
return np.ma.reshape(out, self.shape)
[docs] def get_times(self):
"""
Get a list of unique times in the file
Returns
-------
out : list of floats
List contains unique simulation times (totim) in binary file.
"""
return self.times
[docs] def get_nrecords(self):
"""
Return the number of records in the file
Returns
-------
int
Number of records in the file.
.. deprecated:: 3.8.0
Use :meth:`len` instead.
"""
warnings.warn(
"get_nrecords is deprecated; use len(obj) instead.",
DeprecationWarning,
)
return len(self)
[docs] def get_residual(self, totim, scaled=False):
"""
Return an array the size of the model grid containing the flow residual
calculated from the budget terms. Residual will not be correct unless
all flow terms are written to the budget file.
Parameters
----------
totim : float
Simulation time for which to calculate the residual. This value
must be precise, so it is best to get it from the get_times
method.
scaled : bool
If True, then divide the residual by the total cell inflow
Returns
-------
residual : np.ndarray
The flow residual for the cell of shape (nlay, nrow, ncol)
"""
nlay = self.nlay
nrow = self.nrow
ncol = self.ncol
residual = np.zeros((nlay, nrow, ncol), dtype=float)
if scaled:
inflow = np.zeros((nlay, nrow, ncol), dtype=float)
select_indices = np.asarray(self.recordarray["totim"] == totim).nonzero()[0]
for i in select_indices:
text = self.recordarray[i]["text"].decode()
if self.verbose:
print(f"processing {text}")
flow = self.get_record(idx=i, full3D=True)
if ncol > 1 and "RIGHT FACE" in text:
residual -= flow[:, :, :]
residual[:, :, 1:] += flow[:, :, :-1]
if scaled:
idx = np.asarray(flow < 0.0).nonzero()
inflow[idx] -= flow[idx]
idx = np.asarray(flow > 0.0).nonzero()
l, r, c = idx
idx = (l, r, c + 1)
inflow[idx] += flow[idx]
elif nrow > 1 and "FRONT FACE" in text:
residual -= flow[:, :, :]
residual[:, 1:, :] += flow[:, :-1, :]
if scaled:
idx = np.asarray(flow < 0.0).nonzero()
inflow[idx] -= flow[idx]
idx = np.asarray(flow > 0.0).nonzero()
l, r, c = idx
idx = (l, r + 1, c)
inflow[idx] += flow[idx]
elif nlay > 1 and "LOWER FACE" in text:
residual -= flow[:, :, :]
residual[1:, :, :] += flow[:-1, :, :]
if scaled:
idx = np.asarray(flow < 0.0).nonzero()
inflow[idx] -= flow[idx]
idx = np.asarray(flow > 0.0).nonzero()
l, r, c = idx
idx = (l + 1, r, c)
inflow[idx] += flow[idx]
else:
residual += flow
if scaled:
idx = np.asarray(flow > 0.0).nonzero()
inflow[idx] += flow[idx]
if scaled:
residual_scaled = np.zeros((nlay, nrow, ncol), dtype=float)
idx = inflow > 0.0
residual_scaled[idx] = residual[idx] / inflow[idx]
return residual_scaled
return residual
[docs] def export(
self,
filename: Union[str, PathLike],
kstpkper: Optional[list] = None,
text: Optional[Union[str, list]] = None,
**kwargs,
):
"""
Export budget data to a binary file.
Parameters
----------
filename : str or PathLike
Path to output budget file
kstpkper : list of tuples, optional
Subset of (kstp, kper) tuples to export. If None, exports all time steps.
text : str or list of str, optional
Budget term(s) to export. If None, exports all terms.
Examples: 'FLOW-JA-FACE', ['STORAGE', 'CONSTANT HEAD']
**kwargs
Additional keyword arguments:
- precision : str, 'single' or 'double' (default is the file's precision)
- verbose : bool, print progress messages
Examples
--------
>>> cbc = CellBudgetFile('input.cbc')
>>> # Export all data
>>> cbc.export('output.cbc')
>>> # Export specific time steps
>>> cbc.export('output.cbc', kstpkper=[(1, 0), (1, 1)])
>>> # Export specific budget terms
>>> cbc.export('output.cbc', text='FLOW-JA-FACE')
>>> # Export specific terms and time steps
>>> cbc.export(
... 'output.cbc', kstpkper=[(1, 0)], text=['STORAGE', 'FLOW-JA-FACE']
... )
"""
if kstpkper is None:
kstpkper = self.kstpkper
if text is None:
textlist = self.textlist
elif isinstance(text, str):
textlist = [_pad_text_to_16(text)]
else:
textlist = [_pad_text_to_16(t) for t in text]
verbose = kwargs.get("verbose", False)
precision = kwargs.get("precision", self.precision)
realtype = np.float32 if precision == "single" else np.float64
# header dtypes
h1dt = np.dtype(
[
("kstp", np.int32),
("kper", np.int32),
("text", "S16"),
("ncol", np.int32),
("nrow", np.int32),
("nlay", np.int32),
]
)
h2dt = np.dtype(
[
("imeth", np.int32),
("delt", realtype),
("pertim", realtype),
("totim", realtype),
]
)
sorted_kstpkper = sorted(kstpkper, key=lambda x: (int(x[0]), int(x[1])))
text_mapping = {}
for txt in textlist:
txt_str = txt.decode().strip() if isinstance(txt, bytes) else txt.strip()
txt_upper = txt_str.upper()
matching_records = [
t for t in self.textlist if txt_upper in t.decode().strip().upper()
]
text_mapping[txt] = matching_records
nlay = self.nlay if self.nlay > 0 else None
nrow = self.nrow if self.nrow > 0 else None
ncol = self.ncol if self.ncol > 0 else None
if verbose:
print(f"Writing binary budget file: {filename}")
print(f" Precision: {precision}")
if nlay is not None and nrow is not None and ncol is not None:
print(f" Grid shape: {nlay} layers x {nrow} rows x {ncol} cols")
else:
print(" Grid shape not specified")
with open(filename, "wb") as f:
for ksp in sorted_kstpkper:
kstp = int(ksp[0])
kper = int(ksp[1])
# get_data() expects 0-based but kstpkper is 1-based
ksp_0based = (kstp - 1, kper - 1)
if verbose:
print(f"\n Writing kstp={kstp}, kper={kper}")
for txt in textlist:
for file_txt in text_mapping[txt]:
data_list = self.get_data(kstpkper=ksp_0based, text=file_txt)
if not data_list:
continue
data = data_list[0]
mask = (
(self.recordarray["kstp"] == kstp)
& (self.recordarray["kper"] == kper)
& (self.recordarray["text"] == file_txt)
)
records = self.recordarray[mask]
if len(records) == 0:
continue
record = records[0]
if isinstance(data, np.recarray):
imeth = 6 # list
else:
imeth = 1 # array
text_str = file_txt.decode().strip()
text_bytes = _pad_text_to_16(text_str)
delt = float(record["delt"])
pertim = float(record["pertim"])
totim = float(record["totim"])
if verbose:
print(f" Writing {text_str}: imeth={imeth}")
is_flowja = text_str.upper() == "FLOW-JA-FACE"
# Determine dimensions based on data type
if is_flowja and imeth in [0, 1]:
# keep FLOW-JA-FACE flat/size NJA
nja = np.asarray(data).size
ndim1, ndim2, ndim3 = nja, 1, -1
else:
# Regular budget term: use grid dimensions
if nlay is None or nrow is None or ncol is None:
raise ValueError(
f"Grid dimensions (nlay, nrow, ncol) "
f"required for non-FLOW-JA-FACE "
f"budget term '{text_str}'. "
f"Provided: nlay={nlay}, nrow={nrow}, "
f"ncol={ncol}"
)
# negative nlay -> compact format
ndim1, ndim2, ndim3 = ncol, nrow, -nlay
header1 = np.array(
[(kstp, kper, text_bytes, ndim1, ndim2, ndim3)],
dtype=h1dt,
)
header1.tofile(f)
header2 = np.array([(imeth, delt, pertim, totim)], dtype=h2dt)
header2.tofile(f)
if imeth in [0, 1]:
arr = np.asarray(data, dtype=realtype)
# keep FLOW-JA-FACE flat/size NJA.
# reshape other variables to grid.
if is_flowja and arr.ndim != 1:
arr = arr.flatten()
elif arr.ndim == 1:
arr = arr.reshape(nlay, nrow, ncol)
arr.tofile(f)
elif imeth == 6:
# write model and package names
modelnam = record["modelnam"].decode().strip()
paknam = record["paknam"].decode().strip()
modelnam2 = record["modelnam2"].decode().strip()
paknam2 = record["paknam2"].decode().strip()
for name in [modelnam, paknam, modelnam2, paknam2]:
name_bytes = _pad_text_to_16(name)
f.write(name_bytes)
# write naux and aux var names
standard_fields = {"node", "node2", "q"}
auxtxt = [
name
for name in data.dtype.names
if name not in standard_fields
]
naux = len(auxtxt)
np.array([naux + 1], dtype=np.int32).tofile(f)
for auxname in auxtxt:
f.write(_pad_text_to_16(auxname))
if not (isinstance(data, np.ndarray) and data.dtype.names):
raise ValueError(
"For imeth=6, data must be a numpy recarray "
"with fields: node, node2, q, and optional "
"auxiliary fields"
)
# nrite nlist and list data
nlist = len(data)
np.array([nlist], dtype=np.int32).tofile(f)
dt_list = [
("node", np.int32),
("node2", np.int32),
("q", realtype),
]
for auxname in auxtxt:
dt_list.append((auxname, realtype))
output_dt = np.dtype(dt_list)
output_data = np.zeros(nlist, dtype=output_dt)
for field in output_dt.names:
if field in data.dtype.names:
output_data[field] = data[field].astype(
output_dt[field]
)
output_data.tofile(f)
else:
raise NotImplementedError(
"Expected imeth=1 (array) or imeth=6 (list)"
)
if verbose:
print(f"\nSuccessfully wrote {filename}")
[docs] def close(self):
"""
Close the file handle
"""
self.file.close()
[docs] def reverse(self, filename: Optional[PathLike] = None):
"""
Reverse the time order and signs of the currently loaded binary cell budget
file. If a file name is not provided or if the provided name is the same as
the existing filename, the file will be overwritten and reloaded.
Notes
-----
While `HeadFile.reverse()` reverses only the temporal order of head data,
this method must reverse not only the order but also the sign (direction)
of the model's intercell flows.
filename : str or PathLike, optional
Path of the reversed binary cell budget file.
"""
filename = (
Path(filename).expanduser().absolute()
if filename is not None
else self.filename
)
# header array formats
dt1 = np.dtype(
[
("kstp", np.int32),
("kper", np.int32),
("text", "S16"),
("ndim1", np.int32),
("ndim2", np.int32),
("ndim3", np.int32),
("imeth", np.int32),
("delt", np.float64),
("pertim", np.float64),
("totim", np.float64),
]
)
dt2 = np.dtype(
[
("text1id1", "S16"),
("text1id2", "S16"),
("text2id1", "S16"),
("text2id2", "S16"),
]
)
nrecords = len(self)
target = filename
time = ModelTime.from_headers(self.recordarray)
time._set_totim_dict()
trev = time.reverse()
trev._set_totim_dict()
nper = time.nper
seen = set()
def reverse_header(header):
"""Reverse period, step and time fields in the record header"""
nonlocal seen
kper = header["kper"] - 1
kstp = header["kstp"] - 1
header = header.copy()
header["kper"] = nper - kper
header["kstp"] = time.nstp[kper] - kstp
kper = header["kper"] - 1
kstp = header["kstp"] - 1
seen.add((kper, kstp))
header["pertim"] = trev._pertim_dict[kper, kstp]
header["totim"] = trev._totim_dict[kper, kstp]
return header
# if rewriting the same file, write
# temp file then copy it into place
inplace = filename == self.filename
if inplace:
temp_dir_path = Path(tempfile.gettempdir())
temp_file_path = temp_dir_path / filename.name
target = temp_file_path
with open(target, "wb") as f:
# loop over budget file records in reverse order
for idx in range(nrecords - 1, -1, -1):
# load header array
header = self.recordarray[idx]
header = reverse_header(header)
# Write main header information to backward budget file
h = header[
[
"kstp",
"kper",
"text",
"ncol",
"nrow",
"nlay",
"imeth",
"delt",
"pertim",
"totim",
]
]
# Note: much of the code below is based on binary_file_writer.py
h = np.array(h, dtype=dt1)
h.tofile(f)
if header["imeth"] == 6:
# Write additional header information to the backward budget file
h = header[["modelnam", "paknam", "modelnam2", "paknam2"]]
h = np.array(h, dtype=dt2)
h.tofile(f)
# Load data
data = self.get_data(idx)[0]
data = np.array(data)
# Negate flows
data["q"] = -data["q"]
# Write ndat (number of floating point columns)
colnames = data.dtype.names
ndat = len(colnames) - 2
dt = np.dtype([("ndat", np.int32)])
h = np.array([(ndat,)], dtype=dt)
h.tofile(f)
# Write auxiliary column names
naux = ndat - 1
if naux > 0:
auxtxt = ["{:16}".format(colname) for colname in colnames[3:]]
auxtxt = tuple(auxtxt)
dt = np.dtype([(colname, "S16") for colname in colnames[3:]])
h = np.array(auxtxt, dtype=dt)
h.tofile(f)
# Write nlist
nlist = data.shape[0]
dt = np.dtype([("nlist", np.int32)])
h = np.array([(nlist,)], dtype=dt)
h.tofile(f)
elif header["imeth"] == 1:
# Load data
data = self.get_data(idx)[0]
data = np.array(data, dtype=np.float64)
# Negate flows
data = -data
else:
raise ValueError("not expecting imeth " + header["imeth"])
# Write data
data.tofile(f)
# if we rewrote the original file, reinitialize
if inplace:
move(target, filename)
self.__init__(filename, self.precision, self.verbose) # noqa: PLC2801