Source code for flopy.mf6.utils.binaryfile_utils

import os

import numpy as np
import pandas as pd

from ...utils import binaryfile as bf
from ...utils import import_optional_dependency


[docs]class MFOutput: """ Wrapper class for Binary Arrays. This class enables directly getting slices from the binary output. It is intended to be called from the __getitem__ method of the SimulationDict() class. Implemented to conserve memory. Parameters ---------- path: binary file path location mfdict: SimulationDict() object key: dict key ex. ('flow15','CBC','FLOW RIGHT FACE') Returns ------- Xarray of [n,n,n,n] dimension Usage: ----- >>> val = MFOutput(mfdict, path, key) >>> return val.data User interaction: ----------------- >>> data[('flow15','CBC','FLOW RIGHT FACE')][:,0,1,:] or >>> data[('flow15','CBC','FLOW RIGHT FACE')] """ def __init__(self, mfdict, path, key): self.mfdict = mfdict data = MFOutputRequester(mfdict, path, key) try: self.data = data.querybinarydata except AttributeError: self.data = np.array([[[[]]]]) def __iter__(self): yield self.data def __getitem__(self, index): self.data = self.data[index] return self.data
[docs]class MFOutputRequester: """ MFOutputRequest class is a helper function to enable the user to query binary data from the SimulationDict() object on the fly without actually storing it in the SimulationDict() object. Parameters: ---------- mfdict: dict local instance of the SimulationDict() object path: pointer to the MFSimulationPath object key: tuple user requested data key Methods: ------- MFOutputRequester.querybinarydata returns: Xarray object Examples: -------- >>> data = MFOutputRequester(mfdict, path, key) >>> data.querybinarydata """ def __init__(self, mfdict, path, key): self.path = path self.mfdict = mfdict self.dataDict = {} # get the binary file locations, create a dictionary key to look them # up from, store in self.dataDict self._getbinaryfilepaths() # check if supplied key exists, and model grid type if key in self.dataDict: if (key[0], "disv", "dimensions", "nvert") in self.mfdict: self.querybinarydata = self._querybinarydata_vertices( self.mfdict, key ) elif (key[0], "disu", "connectiondata", "iac") in self.mfdict: self.querybinarydata = self._querybinarydata_unstructured(key) else: self.querybinarydata = self._querybinarydata(key) elif key == ("model", "HDS", "IamAdummy"): pass else: print("\nValid Keys Are:\n") for valid_key in self.dataDict: print(valid_key) raise KeyError(f"Invalid key {key}") def _querybinarydata(self, key): # Basic definition to get output from modflow binary files for # simulations using a structured grid path = self.dataDict[key] bintype = key[1] bindata = self._get_binary_file_object(path, bintype, key) if bintype == "CBC": try: return np.array(bindata.get_data(text=key[-1], full3D=True)) except ValueError: # imeth == 6 return np.array(bindata.get_data(text=key[-1], full3D=False)) else: return np.array(bindata.get_alldata()) def _querybinarydata_vertices(self, mfdict, key): # Basic definition to get output data from binary output files for # simulations that define grid by vertices path = self.dataDict[key] bintype = key[1] bindata = self._get_binary_file_object(path, bintype, key) if bintype == "CBC": if key[-1] == "FLOW-JA-FACE": data = np.array(bindata.get_data(text=key[-1])) # uncomment line to remove extra dimensions from data # data data.shape = (len(times), -1) return data else: try: data = np.array( bindata.get_data(text=key[-1], full3D=True) ) except ValueError: # imeth == 6 data = np.array( bindata.get_data(text=key[-1], full3D=False) ) else: data = np.array(bindata.get_alldata()) # uncomment line to remove extra dimensions from data # data = _reshape_binary_data(data, 'V') return data def _querybinarydata_unstructured(self, key): # get unstructured binary data in numpy array format. path = self.dataDict[key] bintype = key[1] bindata = self._get_binary_file_object(path, bintype, key) if bintype == "CBC": try: data = np.array(bindata.get_data(text=key[-1], full3D=True)) except ValueError: data = np.array(bindata.get_data(text=key[-1], full3D=False)) else: data = bindata.get_alldata() # remove un-needed dimensions data = _reshape_binary_data(data, "U") if key[-1] == "FLOW-JA-FACE": return data else: return data def _get_binary_file_object(self, path, bintype, key): # simple method that tries to open the binary file object using Flopy if bintype == "CBC": try: return bf.CellBudgetFile(path, precision="double") except AssertionError: raise AssertionError(f"{self.dataDict[key]} does not exist") elif bintype == "HDS": try: return bf.HeadFile(path, precision="double") except AssertionError: raise AssertionError(f"{self.dataDict[key]} does not exist") elif bintype == "DDN": try: return bf.HeadFile(path, text="drawdown", precision="double") except AssertionError: raise AssertionError(f"{self.dataDict[key]} does not exist") elif bintype == "UCN": try: return bf.UcnFile(path, precision="single") except AssertionError: raise AssertionError(f"{self.dataDict[key]} does not exist") else: raise AssertionError() @staticmethod def _get_vertices(mfdict, key): """ Depreciated! Consider removing from code. Parameters ---------- key: binary query dictionary key Returns ------- information defining specified vertices for all model cells to be added to xarray as coordinates. cellid: (list) corresponds to the modflow CELL2d cell number xcyc: (n x 2) dimensional Pandas object of tuples defining the CELL2d center coordinates nverts: (list) number of xy vertices corresponding to a cell xv: (n x nverts) dimensional Pandas object of tuples. Contains x vertices for a cell yv: (n x nverts) dimensional Pandas object of tuples. Contains y vertices for a cell topv: (n x nlayers) dimensional Pandas object of cell top elevations corresponding to a row column location botmv: (n x nlayers) dimensional Pandas object of cell bottom elevations corresponding to a row column location """ mname = key[0] cellid = mfdict[(mname, "DISV8", "CELL2D", "cell2d_num")] cellxc = mfdict[(mname, "DISV8", "CELL2D", "xc")] cellyc = mfdict[(mname, "DISV8", "CELL2D", "yc")] xcyc = [(cellxc[i], cellyc[i]) for i in range(len(cellxc))] xcyc = pd.Series(xcyc, dtype="object") nverts = mfdict[(mname, "DISV8", "CELL2D", "nvert")] vertnums = mfdict[(mname, "DISV8", "CELL2D", "iv")] vertid = mfdict[(mname, "DISV8", "VERTICES", "vert_num")] vertx = mfdict[(mname, "DISV8", "VERTICES", "x")] verty = mfdict[(mname, "DISV8", "VERTICES", "y")] # get vertices that correspond to CellID list xv = [] yv = [] for line in vertnums: tempx = [] tempy = [] for vert in line: idx = vertid.index(vert) tempx.append(vertx[idx]) tempy.append(verty[idx]) xv.append(tempx) yv.append(tempy) xv = pd.Series(xv, dtype="object") yv = pd.Series(yv, dtype="object") top = np.array(mfdict[(mname, "DISV8", "CELLDATA", "top")]) botm = np.array(mfdict[(mname, "DISV8", "CELLDATA", "botm")]) top = top.tolist() botm = botm.tolist() # get cell top and bottom by layer topv = list(zip(top, *botm[:-1])) botmv = list(zip(*botm)) topv = pd.Series(topv, dtype="object") botmv = pd.Series(botmv, dtype="object") return cellid, xcyc, nverts, xv, yv, topv, botmv def _getbinaryfilepaths(self): # model paths self.modelpathdict = {} for i in self.path.model_relative_path: self.modelpathdict[i] = self.path.get_model_path(i) sim_path = self.path.get_sim_path() self.binarypathdict = {} # check output control to see if a binary file is supposed to exist. # Get path to that file for i in self.modelpathdict: if (i, "oc", "options", "budget_filerecord") in self.mfdict: cbc = self.mfdict[(i, "oc", "options", "budget_filerecord")] if cbc.get_data() is not None: self.binarypathdict[(i, "CBC")] = os.path.join( sim_path, cbc.get_data()[0][0] ) if (i, "oc", "options", "head_filerecord") in self.mfdict: hds = self.mfdict[(i, "oc", "options", "head_filerecord")] if hds.get_data() is not None: self.binarypathdict[(i, "HDS")] = os.path.join( sim_path, hds.get_data()[0][0] ) if (i, "oc", "options", "drawdown_filerecord") in self.mfdict: ddn = self.mfdict[(i, "oc", "options", "drawdown_filerecord")] if ddn.get_data() is not None: self.binarypathdict[(i, "DDN")] = os.path.join( sim_path, ddn.get_data()[0][0] ) self._setbinarykeys(self.binarypathdict) def _setbinarykeys(self, binarypathdict): # check that if a binary file is supposed to exist, it does, and create # a dictionary key to access that data for key in binarypathdict: path = binarypathdict[key] if key[1] == "CBC": try: readcbc = bf.CellBudgetFile(path, precision="double") for record in readcbc.get_unique_record_names(): name = record.decode("utf-8").strip(" ") # store keys along with model name in ordered dict? self.dataDict[(key[0], key[1], name)] = path readcbc.close() except: pass elif key[1] == "HDS": try: readhead = bf.HeadFile(path, precision="double") self.dataDict[(key[0], key[1], "HEAD")] = path readhead.close() except: pass elif key[1] == "DDN": try: readddn = bf.HeadFile( path, text="drawdown", precision="double" ) self.dataDict[(key[0], key[1], "DRAWDOWN")] = path readddn.close() except: pass elif key[1] == "UCN": try: readucn = bf.UcnFile(path, precision="single") self.dataDict[(key[0], key[1], "CONCENTRATION")] = path readucn.close() except: pass else: pass
[docs] @staticmethod def getkeys(mfdict, path, print_keys=True): # use a dummy key to get valid binary output keys dummy_key = ("model", "HDS", "IamAdummy") x = MFOutputRequester(mfdict, path, dummy_key) keys = [i for i in x.dataDict] if print_keys is True: for key in keys: print(key) return x
def _reshape_binary_data(data, dtype=None): # removes unnecessary dimensions from data returned by # flopy.utils.binaryfile time = len(data) data = np.array(data) if dtype is None: return data elif dtype == "V": nodes = len(data[0][0][0]) data.shape = (time, -1, nodes) elif dtype == "U": data.shape = (time, -1) else: err = "Invalid dtype flag supplied, valid are dtype='U', dtype='V'" raise Exception(err) return data