import os
from ...mbase import ModelInterface
from ...pakbase import PackageInterface
from ...utils import (
CellBudgetFile,
HeadFile,
Mf6ListBudget,
Mf6Obs,
ZoneBudget6,
ZoneFile6,
)
from ...utils.observationfile import CsvFile
[docs]class MF6Output:
"""
A class that uses meta programming to get output
Parameters
----------
obj : PackageInterface object
"""
def __init__(self, obj, budgetkey=None):
from ..modflow import ModflowGwfoc, ModflowGwtoc, ModflowUtlobs
# set initial observation definitions
methods = {
"budget": self.__budget,
"budgetcsv": self.__budgetcsv,
"zonebudget": self.__zonebudget,
"obs": self.__obs,
"csv": self.__csv,
"package_convergence": self.__csv,
}
delist = ("ts", "wc")
self._obj = obj
self._methods = []
self._sim_ws = obj.simulation_data.mfpath.get_sim_path()
self._budgetkey = (
"VOLUME BUDGET FOR ENTIRE MODEL"
if budgetkey is None
else budgetkey
)
self.__budgetcsv = False
if not isinstance(obj, (PackageInterface, ModelInterface)):
raise TypeError("Only mf6 PackageInterface types can be used")
# capture the list file for Models and for OC packages
if isinstance(obj, (ModelInterface, ModflowGwfoc, ModflowGwtoc)):
if isinstance(obj, ModelInterface):
self._model = obj
else:
self._model = obj.model_or_sim
self._mtype = self._model.model_type
nam_file = self._model.model_nam_file[:-4]
self._lst = (
self._model.name_file.blocks["options"].datasets["list"].array
)
if self._lst is None:
self._lst = f"{nam_file}.lst"
setattr(self, "list", self.__list)
self._methods.append("list()")
if isinstance(obj, ModelInterface):
return
else:
if obj.model_or_sim.type == "Model":
self._model = obj.model_or_sim
else:
self._model = None
obspkg = False
if isinstance(obj, ModflowUtlobs):
# this is a package
obspkg = True
rectype = "obs"
layerfiles = {}
if not obspkg:
# skim through the dfn file
try:
datasets = obj.blocks["options"].datasets
except KeyError:
return
for key, value in datasets.items():
if "_filerecord" in key:
tmp = key.split("_")
if tmp[0] in methods:
rectype = tmp[0]
else:
rectype = "_".join(tmp[:-1])
data = value.array
if rectype in delist:
continue
else:
if rectype not in methods:
layerfiles[rectype] = data
else:
setattr(self, rectype, methods[rectype])
if rectype == "budget":
setattr(
self, "zonebudget", methods["zonebudget"]
)
self._methods.append("zonebudget()")
elif rectype == "budgetcsv":
self.__budgetcsv = True
self._methods.append(f"{rectype}()")
if rectype == "obs":
data = None
for ky in obj._simulation_data.mfdata:
if obj.path == (ky[0:2]):
if str(ky[-2]).lower() == "fileout":
data = [[ky[-1]]]
break
elif (
str(ky[-3]) == "continuous"
and str(ky[-1]) == "output"
):
if (
obj._simulation_data.mfdata[
ky
].array[0][0]
== "fileout"
):
data = [
[
obj._simulation_data.mfdata[
ky
].array[0][-2]
]
]
break
if rectype == "package_convergence":
rectype = "csv"
attr_name = f"_{rectype}"
# need a check for obs....
if data is not None:
if not hasattr(self, attr_name):
setattr(self, attr_name, [data[0][0]])
else:
attr = getattr(self, attr_name)
if attr is None:
attr = [data[0][0]]
else:
attr.append(data[0][0])
setattr(self, attr_name, attr)
else:
setattr(self, attr_name, data)
else:
setattr(self, rectype, methods[rectype])
self._methods.append(f"{rectype}()")
data = obj.data_list[2].data
for f in data.keys():
attr_name = f"_{rectype}"
if not hasattr(self, attr_name):
setattr(self, attr_name, [f])
else:
attr = getattr(self, attr_name)
if attr is None:
attr = [f]
else:
attr.append(f)
setattr(self, attr_name, attr)
if layerfiles:
for rectype, data in layerfiles.items():
if data is not None:
data = data[0][0]
def get_layerfile_data(self, f=data, text=rectype):
"""
Method to get data from a binary layer file
Parameters
----------
self : MetaMF6Output
placeholder for the self attr after setting
as an attribute of the base class.
f : str
model head or other layer file
text : str
layer file header search string
Returns
-------
HeadFile object
"""
if f is not None:
try:
f = os.path.join(self._sim_ws, f)
return HeadFile(f, text=text)
except OSError:
return
setattr(self.__class__, rectype, get_layerfile_data)
self._methods.append(f"{rectype}()")
def __repr__(self):
"""
String representation of the MF6Output object
Returns
-------
s : str
human readable class representation
"""
name = self._obj.name
if isinstance(name, list):
name = name[0]
l = [
f"MF6Output Class for {name}",
"Available output methods include:",
]
l += [f".{m}" for m in self.methods()]
s = "\n".join(l)
return s
[docs] def methods(self):
"""
Method that returns a list of available method calls
Returns
-------
list
"""
if self._methods:
return self._methods
@property
def obs_names(self):
"""
Method to get obs file names
Returns
-------
list
"""
try:
return self._obs
except AttributeError:
return
@property
def csv_names(self):
"""
Method to get csv file names
Returns
-------
list
"""
try:
return self._csv
except AttributeError:
return
def __zonebudget(self, izone):
"""
Returns
-------
"""
budget = self.__budget()
grb = None
if budget is not None:
zonbud = ZoneBudget6(model_ws=self._sim_ws)
ZoneFile6(zonbud, izone)
zonbud.bud = budget
try:
if (
"gwf" in self._obj.model_or_sim.model_type
or "gwt" in self._obj.model_or_sim.model_type
):
if self._obj.package_type == "oc":
dis = self._obj.model_or_sim.dis
if (
dis.blocks["options"].datasets["nogrb"].array
is None
):
grb = os.path.join(
self._sim_ws, f"{dis.filename}.grb"
)
except AttributeError:
pass
zonbud.grb = grb
return zonbud
def __budgetcsv(self):
"""
Convenience method to open and return a budget csv object
Returns
-------
flopy.utils.CsvFile object
"""
return self.__csv(budget=True)
def __budget(self, precision="double"):
"""
Convenience method to open and return a budget object
Returns
-------
flopy.utils.CellBudgetFile object
"""
if self._budget is not None:
try:
budget_file = os.path.join(self._sim_ws, self._budget[0])
return CellBudgetFile(
budget_file,
precision=precision,
modelgrid=self._model.modelgrid,
)
except OSError:
return None
def __obs(self, f=None):
"""
Method to read and return obs files
Parameters
----------
f : str, None
observation file name, if None the first observation file
will be returned
Returns
-------
flopy.utils.Mf6Obs file object
"""
if self._obs is not None:
obs_file = self.__mulitfile_handler(f, self._obs)
try:
obs_file = os.path.join(self._sim_ws, obs_file)
return Mf6Obs(obs_file)
except OSError:
return None
def __csv(self, f=None, budget=False):
"""
Method to get csv file outputs
Parameters
----------
f : str
csv file name path
budget : bool
boolean flag to indicate budgetcsv file
Returns
-------
flopy.utils.CsvFile object
"""
if budget and self._budgetcsv is not None:
csv_file = self.__mulitfile_handler(f, self._budgetcsv)
elif self._csv is not None:
csv_file = self.__mulitfile_handler(f, self._csv)
else:
return
try:
csv_file = os.path.join(self._sim_ws, csv_file)
return CsvFile(csv_file)
except OSError:
return None
def __list(self):
"""
Method to read list files
Returns
-------
Mf6ListBudget object
"""
if self._lst is not None:
try:
list_file = os.path.join(self._sim_ws, self._lst)
return Mf6ListBudget(list_file, budgetkey=self._budgetkey)
except (AssertionError, OSError):
return None
def __mulitfile_handler(self, f, flist):
"""
Method to parse multiple output files of the same type
Parameters
----------
f : str
file name
flist : list
list of output file names
Returns
-------
file name string of valid file or first file is f is None
"""
if len(flist) > 1 and f is None:
print("Multiple csv files exist, selecting first")
filename = flist[0]
else:
if f is None:
filename = flist[0]
else:
idx = flist.index(f)
if idx is None:
err = f"File name not found, available files are {', '.join(flist)}"
raise FileNotFoundError(err)
else:
filename = flist[idx]
return filename