import numpy as np
from ..utils import import_optional_dependency
[docs]class SfrFile:
"""
Read SFR package results from text file (ISTCB2 > 0)
Parameters
----------
filename : str
Name of the sfr output file
geometries : any
Ignored
verbose : any
Ignored
Attributes
----------
Methods
-------
See Also
--------
Notes
-----
Indexing starts at one for: layer, row, column, segment, reach.
Indexing starts at zero for: i, j, k, and kstpkper.
Examples
--------
>>> import flopy
>>> sfq = flopy.utils.SfrFile('mymodel.sfq')
"""
# non-float dtypes (default is float)
dtypes = {
"layer": int,
"row": int,
"column": int,
"segment": int,
"reach": int,
}
def __init__(self, filename, geometries=None, verbose=False):
"""
Class constructor.
"""
self.pd = import_optional_dependency("pandas")
# get the number of rows to skip at top, and the number of data columns
self.filename = filename
evaluated_format = False
has_gradient = False
has_delUzstor = False
has_elevation = False
with open(self.filename) as f:
for i, line in enumerate(f):
if "GRADIENT" in line:
has_gradient = True
if "CHNG. UNSAT." in line:
has_delUzstor = True
if "ELEVATION" in line:
has_elevation = True
items = line.strip().split()
if len(items) > 0 and items[0].isdigit():
evaluated_format = True
self.sr = i
self.ncol = len(items)
break
if not evaluated_format:
raise ValueError(
f"could not evaluate format of {self.filename!r} for SfrFile"
)
# all outputs start with the same 15 columns
self.names = [
"layer",
"row",
"column",
"segment",
"reach",
"Qin",
"Qaquifer",
"Qout",
"Qovr",
"Qprecip",
"Qet",
"stage",
"depth",
"width",
"Cond",
]
if has_gradient and has_delUzstor:
raise ValueError(
"column 16 should be either 'gradient' or 'Qwt', not both"
)
elif has_gradient:
self.names.append("gradient")
elif has_delUzstor:
self.names += ["Qwt", "delUzstor"]
if self.ncol == 18:
self.names.append("gw_head")
if has_elevation:
self.names.append("strtop")
self.times = self.get_times()
self.geoms = None # not implemented yet
self._df = None
[docs] def get_times(self):
"""
Parse the stress period/timestep headers.
Returns
-------
kstpkper : tuple
list of kstp, kper tuples
"""
kstpkper = []
with open(self.filename) as input:
for line in input:
if "STEP" in line:
line = line.strip().split()
kper, kstp = int(line[3]) - 1, int(line[5]) - 1
kstpkper.append((kstp, kper))
return kstpkper
@property
def df(self):
if self._df is None:
self._df = self.get_dataframe()
return self._df
[docs] @staticmethod
def get_nstrm(df):
"""
Get the number of SFR cells from the results dataframe.
Returns
-------
nrch : int
Number of SFR cells
"""
wherereach1 = np.where((df.segment == 1) & (df.reach == 1))[0]
if len(wherereach1) == 1:
return len(df)
elif len(wherereach1) > 1:
return wherereach1[1]
[docs] def get_dataframe(self):
"""
Read the whole text file into a pandas dataframe.
Returns
-------
df : pandas dataframe
SFR output as a pandas dataframe
"""
kwargs = {
"filepath_or_buffer": self.filename,
"delim_whitespace": True,
"header": None,
"names": self.names,
"skiprows": self.sr,
"low_memory": False,
}
try: # since pandas 1.3.0
df = self.pd.read_csv(**kwargs, on_bad_lines="skip")
except TypeError: # before pandas 1.3.0
df = self.pd.read_csv(**kwargs, error_bad_lines=False)
# drop text between stress periods; convert to numeric
df["layer"] = self.pd.to_numeric(df.layer, errors="coerce")
df.dropna(axis=0, inplace=True)
# convert to proper dtypes
for c in df.columns:
df[c] = df[c].astype(self.dtypes.get(c, float))
# add time, reachID, and reach geometry (if it exists)
self.nstrm = self.get_nstrm(df)
dftimes = []
times = self.get_times()
newper = df.segment.diff().fillna(0).values < 0
kstpkper = times.pop(0)
for per in newper:
if per:
kstpkper = times.pop(0)
dftimes.append(kstpkper)
df["kstpkper"] = dftimes
df["k"] = df["layer"] - 1
df["i"] = df["row"] - 1
df["j"] = df["column"] - 1
if self.geoms is not None:
geoms = self.geoms * self.nstrm
df["geometry"] = geoms
self._df = df
return df
def _get_result(self, segment, reach):
"""
Parameters
----------
segment : int or sequence of ints
Segment number for each location.
reach : int or sequence of ints
Reach number for each location
Returns
-------
"""
return self.df.loc[
(self.df.segment == segment) & (self.df.reach == reach)
].copy()
[docs] def get_results(self, segment, reach):
"""
Get results for a single reach or sequence of segments and reaches.
Parameters
----------
segment : int or sequence of ints
Segment number for each location.
reach : int or sequence of ints
Reach number for each location
Returns
-------
results : dataframe
Dataframe of same format as SfrFile.df, but subset to input locations.
"""
try:
segment = int(segment)
reach = int(reach)
results = self._get_result(segment, reach)
except:
locsr = list(zip(segment, reach))
results = self.pd.DataFrame()
for s, r in locsr:
srresults = self._get_result(s, r)
if len(srresults) > 0:
results = results.append(srresults)
else:
print(f"No results for segment {s}, reach {r}!")
return results