Source code for flopy.modflow.mfhob

import numpy as np

from ..pakbase import Package
from ..utils.flopy_io import line_strip
from ..utils.recarray_utils import create_empty_recarray


[docs]class ModflowHob(Package): """ Head Observation package class Parameters ---------- iuhobsv : int unit number where output is saved. If iuhobsv is None, a unit number will be assigned (default is None). hobdry : float Value of the simulated equivalent written to the observation output file when the observation is omitted because a cell is dry (default is 0). tomulth : float Time step multiplier for head observations. The product of tomulth and toffset must produce a time value in units consistent with other model input. tomulth can be dimensionless or can be used to convert the units of toffset to the time unit used in the simulation (default is 1). obs_data : HeadObservation or list of HeadObservation instances A single HeadObservation instance or a list of HeadObservation instances containing all of the data for each observation. If obs_data is None a default HeadObservation with an observation in layer, row, column (0, 0, 0) and a head value of 0 at totim 0 will be created (default is None). hobname : str Name of head observation output file. If iuhobsv is greater than 0, and hobname is None, the model basename with a '.hob.out' extension will be used (default is None). extension : string Filename extension (default is hob) no_print : boolean When True or 1, a list of head observations will not be written to the Listing File (default is False) options : list of strings Package options (default is None). unitnumber : int File unit number (default is None) filenames : str or list of str Filenames to use for the package and the output files. If filenames is None the package name will be created using the model name and package extension and the hob output name will be created using the model name and .hob.out extension (for example, modflowtest.hob.out), if iuhobsv is a number greater than zero. If a single string is passed the package will be set to the string and hob output name will be created using the model name and .hob.out extension, if iuhobsv is a number greater than zero. To define the names for all package files (input and output) the length of the list of strings should be 2. Default is None. Attributes ---------- Methods ------- See Also -------- Notes Examples -------- >>> import flopy >>> model = flopy.modflow.Modflow() >>> dis = flopy.modflow.ModflowDis(model, nlay=1, nrow=11, ncol=11, nper=2, ... perlen=[1,1]) >>> tsd = [[1.,54.4], [2., 55.2]] >>> obsdata = flopy.modflow.HeadObservation(model, layer=0, row=5, ... column=5, time_series_data=tsd) >>> hob = flopy.modflow.ModflowHob(model, iuhobsv=51, hobdry=-9999., ... obs_data=obsdata) """ def __init__( self, model, iuhobsv=None, hobdry=0, tomulth=1.0, obs_data=None, hobname=None, extension="hob", no_print=False, options=None, unitnumber=None, filenames=None, ): # set default unit number of one is not specified if unitnumber is None: unitnumber = ModflowHob._defaultunit() # set filenames filenames = self._prepare_filenames(filenames, 2) # set filenames[1] to hobname if filenames[1] is not None if filenames[1] is None: if hobname is not None: filenames[1] = hobname if iuhobsv is not None: model.add_output_file( iuhobsv, fname=filenames[1], extension="hob.out", binflag=False, package=self._ftype(), ) else: iuhobsv = 0 # call base package constructor super().__init__( model, extension=extension, name=self._ftype(), unit_number=unitnumber, filenames=filenames[0], ) self.url = "hob.html" self._generate_heading() self.iuhobsv = iuhobsv self.hobdry = hobdry self.tomulth = tomulth # create default if obs_data is None: obs_data = HeadObservation(model) # make sure obs_data is a list if isinstance(obs_data, HeadObservation): obs_data = [obs_data] # set self.obs_data self.obs_data = obs_data self.no_print = no_print self.np = 0 if options is None: options = [] if self.no_print: options.append("NOPRINT") self.options = options # add checks for input compliance (obsnam length, etc.) self.parent.add_package(self) def _set_dimensions(self): """ Set the length of the obs_data list Returns ------- None """ # make sure each entry of obs_data list is a HeadObservation instance # and calculate nh, mobs, and maxm msg = "" self.nh = 0 self.mobs = 0 self.maxm = 0 for idx, obs in enumerate(self.obs_data): if not isinstance(obs, HeadObservation): msg += ( f"ModflowHob: obs_data entry {idx} " "is not a HeadObservation instance.\n" ) continue self.nh += obs.nobs if obs.multilayer: self.mobs += obs.nobs self.maxm = max(self.maxm, obs.maxm) if msg != "": raise ValueError(msg) return
[docs] def write_file(self): """ Write the package file Returns ------- None """ # determine the dimensions of HOB data self._set_dimensions() # open file for writing f = open(self.fn_path, "w") # write dataset 0 f.write(f"{self.heading}\n") # write dataset 1 f.write(f"{self.nh:10d}") f.write(f"{self.mobs:10d}") f.write(f"{self.maxm:10d}") f.write(f"{self.iuhobsv:10d}") f.write(f"{self.hobdry:10.4g}") if self.no_print or "NOPRINT" in self.options: f.write("{: >10}".format("NOPRINT")) f.write("\n") # write dataset 2 f.write(f"{self.tomulth:10.4g}\n") # write datasets 3-6 for idx, obs in enumerate(self.obs_data): # dataset 3 obsname = obs.obsname if isinstance(obsname, bytes): obsname = obsname.decode("utf-8") line = f"{obsname:12s} " layer = obs.layer if layer >= 0: layer += 1 line += f"{layer:10d} " line += f"{obs.row + 1:10d} " line += f"{obs.column + 1:10d} " irefsp = obs.irefsp if irefsp >= 0: irefsp += 1 line += f"{irefsp:10d} " if obs.nobs == 1: toffset = obs.time_series_data[0]["toffset"] hobs = obs.time_series_data[0]["hobs"] else: toffset = 0.0 hobs = 0.0 line += f"{toffset:20} " line += f"{obs.roff:10.4f} " line += f"{obs.coff:10.4f} " line += f"{hobs:10.4f} " line += f" # DATASET 3 - Observation {idx + 1}" f.write(f"{line}\n") # dataset 4 if len(obs.mlay.keys()) > 1: line = "" for key, value in iter(obs.mlay.items()): line += f"{key + 1:5d}{value:10.4f}" line += f" # DATASET 4 - Observation {idx + 1}" f.write(f"{line}\n") # dataset 5 if irefsp < 0: line = f"{obs.itt:10d}" line += 103 * " " line += f" # DATASET 5 - Observation {idx + 1}" f.write(f"{line}\n") # dataset 6: if obs.nobs > 1: for jdx, t in enumerate(obs.time_series_data): obsname = t["obsname"] if isinstance(obsname, bytes): obsname = obsname.decode("utf-8") line = f"{obsname:12s} " line += f"{t['irefsp'] + 1:10d} " line += f"{t['toffset']:20} " line += f"{t['hobs']:10.4f} " line += 55 * " " line += f" # DATASET 6 - Observation {idx + 1}.{jdx + 1}" f.write(f"{line}\n") # close the hob package file f.close() return
[docs] @classmethod def load(cls, f, model, ext_unit_dict=None, check=True): """ Load an existing package. Parameters ---------- f : filename or file handle File to load. model : model object The model object (of type :class:`flopy.modflow.mf.Modflow`) to which this package will be added. ext_unit_dict : dictionary, optional If the arrays in the file are specified using EXTERNAL, or older style array control records, then `f` should be a file handle. In this case ext_unit_dict is required, which can be constructed using the function :class:`flopy.utils.mfreadnam.parsenamefile`. check : boolean Check package data for common errors. (default True) Returns ------- hob : ModflowHob package object ModflowHob package object. Examples -------- >>> import flopy >>> m = flopy.modflow.Modflow() >>> hobs = flopy.modflow.ModflowHob.load('test.hob', m) """ if model.verbose: print("loading hob package file...") openfile = not hasattr(f, "read") if openfile: filename = f f = open(filename, "r") # dataset 0 -- header while True: line = f.readline() if line[0] != "#": break # read dataset 1 t = line_strip(line).split() nh = int(t[0]) iuhobsv = None hobdry = 0 if len(t) > 3: iuhobsv = int(t[3]) hobdry = float(t[4]) # read dataset 2 line = f.readline() t = line_strip(line).split() tomulth = float(t[0]) # read observation data obs_data = [] # read datasets 3-6 nobs = 0 # set to False for 1st call to ensure that totim cache is updated tmax = model.dis.get_final_totim() use_cached_totim = True while True: # read dataset 3 line = f.readline() t = line_strip(line).split() obsnam = t[0] layer = int(t[1]) row = int(t[2]) - 1 col = int(t[3]) - 1 irefsp0 = int(t[4]) toffset = float(t[5]) roff = float(t[6]) coff = float(t[7]) hob = float(t[8]) # read dataset 4 if multilayer obs if layer > 0: layer -= 1 mlay = {layer: 1.0} else: line = f.readline() t = line_strip(line).split() mlay = {} if len(t) >= abs(layer) * 2: for j in range(0, abs(layer) * 2, 2): k = int(t[j]) - 1 # catch case where the same layer is specified # more than once. In this case add previous # value to the current value keys = list(mlay.keys()) v = 0.0 if k in keys: v = mlay[k] mlay[k] = float(t[j + 1]) + v else: for j in range(abs(layer)): k = int(t[0]) - 1 keys = list(mlay.keys()) v = 0.0 if k in keys: v = mlay[k] mlay[k] = float(t[1]) + v if j != abs(layer) - 1: line = f.readline() t = line_strip(line).split() # reset layer layer = -len(list(mlay.keys())) # read datasets 5 & 6. Index loop variable if irefsp0 > 0: itt = 1 irefsp0 -= 1 totim = model.dis.get_totim_from_kper_toffset( irefsp0, toffset * tomulth, use_cached_totim ) names = [obsnam] tsd = [totim, hob] nobs += 1 else: names = [] tsd = [] # read data set 5 line = f.readline() t = line_strip(line).split() itt = int(t[0]) # dataset 6 for j in range(abs(irefsp0)): line = f.readline() t = line_strip(line).split() names.append(t[0]) irefsp = int(t[1]) - 1 toffset = float(t[2]) totim = model.dis.get_totim_from_kper_toffset( irefsp, toffset * tomulth, use_cached_totim ) hob = float(t[3]) tsd.append([totim, hob]) nobs += 1 obs_data.append( HeadObservation( model, tomulth=tomulth, layer=layer, row=row, column=col, roff=roff, coff=coff, obsname=obsnam, mlay=mlay, itt=itt, tmax=tmax, time_series_data=tsd, names=names, ) ) if nobs == nh: break if openfile: f.close() # set package unit number unitnumber = None filenames = [None, None] if ext_unit_dict is not None: unitnumber, filenames[0] = model.get_ext_dict_attr( ext_unit_dict, filetype=ModflowHob._ftype() ) if iuhobsv is not None: if iuhobsv > 0: iu, filenames[1] = model.get_ext_dict_attr( ext_unit_dict, unit=iuhobsv ) model.add_pop_key_list(iuhobsv) return cls( model, iuhobsv=iuhobsv, hobdry=hobdry, tomulth=tomulth, obs_data=obs_data, unitnumber=unitnumber, filenames=filenames, )
@staticmethod def _ftype(): return "HOB" @staticmethod def _defaultunit(): return 39
[docs]class HeadObservation: """ Create single HeadObservation instance from a time series array. A list of HeadObservation instances are passed to the ModflowHob package. Parameters ---------- tomulth : float Time-offset multiplier for head observations. Default is 1. obsname : string Observation name. Default is 'HOBS' layer : int The zero-based layer index of the cell in which the head observation is located. If layer is less than zero, hydraulic heads from multiple layers are combined to calculate a simulated value. The number of layers equals the absolute value of layer, or abs(layer). Default is 0. row : int The zero-based row index for the observation. Default is 0. column : int The zero-based column index of the observation. Default is 0. irefsp : int The zero-based stress period to which the observation time is referenced. roff : float Fractional offset from center of cell in Y direction (between rows). Default is 0. coff : float Fractional offset from center of cell in X direction (between columns). Default is 0. itt : int Flag that identifies whether head or head changes are used as observations. itt = 1 specified for heads and itt = 2 specified if initial value is head and subsequent changes in head. Only specified if irefsp is < 0. Default is 1. tmax : float Maximum simulation time calculated using get_final_totim function of ModflowDis. Added to avoid repetitive calls. mlay : dictionary of length (abs(irefsp)) Key represents zero-based layer numbers for multilayer observations and value represents the fractional value for each layer of multilayer observations. If mlay is None, a default mlay of {0: 1.} will be used (default is None). time_series_data : list or numpy array Two-dimensional list or numpy array containing the simulation time of the observation and the observed head [[totim, hob]]. If time_series_dataDefault is None, a default observation of 0. at totim 0. will be created (default is None). names : list List of specified observation names. If names is None, observation names will be automatically generated from obsname and the order of the timeseries data (default is None). Returns ------- obs : HeadObservation HeadObservation object. Examples -------- >>> import flopy >>> model = flopy.modflow.Modflow() >>> dis = flopy.modflow.ModflowDis(model, nlay=1, nrow=11, ncol=11, nper=2, ... perlen=[1,1]) >>> tsd = [[1.,54.4], [2., 55.2]] >>> obsdata = flopy.modflow.HeadObservation(model, layer=0, row=5, ... column=5, time_series_data=tsd) """ def __init__( self, model, tomulth=1.0, obsname="HOBS", layer=0, row=0, column=0, irefsp=None, roff=0.0, coff=0.0, itt=1, tmax=None, mlay=None, time_series_data=None, names=None, ): """ Object constructor """ if mlay is None: mlay = {0: 1.0} if time_series_data is None: time_series_data = [[0.0, 0.0]] if irefsp is None: if len(time_series_data) == 1: irefsp = 1 else: irefsp = -1 * len(time_series_data) # set class attributes self.obsname = obsname self.layer = layer self.row = row self.column = column self.irefsp = irefsp self.roff = roff self.coff = coff self.itt = itt self.mlay = mlay self.maxm = 0 # check if multilayer observation self.multilayer = False if len(self.mlay.keys()) > 1: self.maxm = len(self.mlay.keys()) self.multilayer = True tot = 0.0 for key, value in self.mlay.items(): tot += value if not (np.isclose(tot, 1.0, rtol=0)): raise ValueError( "sum of dataset 4 proportions must equal 1.0 - " "sum of dataset 4 proportions = {tot} for " "observation name {obsname}.".format( tot=tot, obsname=self.obsname ) ) # convert passed time_series_data to a numpy array if isinstance(time_series_data, list): time_series_data = np.array(time_series_data, dtype=float) # if a single observation is passed as a list reshape to a # two-dimensional numpy array if len(time_series_data.shape) == 1: time_series_data = np.reshape(time_series_data, (1, 2)) # find indices of time series data that are valid if tmax is None: tmax = model.dis.get_final_totim() keep_idx = time_series_data[:, 0] <= tmax time_series_data = time_series_data[keep_idx, :] # set the number of observations in this time series shape = time_series_data.shape self.nobs = shape[0] # construct names if not passed if names is None: if self.nobs == 1: names = [obsname] else: names = [] for idx in range(self.nobs): names.append(f"{obsname}.{idx + 1}") # make sure the length of names is greater than or equal to nobs else: if isinstance(names, str): names = [names] elif not isinstance(names, list): raise ValueError( "HeadObservation names must be a " "string or a list of strings" ) if len(names) < self.nobs: raise ValueError( "a name must be specified for every valid observation " "- {} names were passed but at least " "{} names are required.".format(len(names), self.nobs) ) # create time_series_data self.time_series_data = self._get_empty(ncells=shape[0]) for idx in range(self.nobs): t = time_series_data[idx, 0] kstp, kper, toffset = model.dis.get_kstp_kper_toffset( t, use_cached_totim=True ) self.time_series_data[idx]["totim"] = t self.time_series_data[idx]["irefsp"] = kper self.time_series_data[idx]["toffset"] = toffset / tomulth self.time_series_data[idx]["hobs"] = time_series_data[idx, 1] self.time_series_data[idx]["obsname"] = names[idx] if self.nobs > 1: self.irefsp = -self.nobs else: self.irefsp = self.time_series_data[0]["irefsp"] def _get_empty(self, ncells=0): """ Get an empty time_series_data recarray for a HeadObservation Parameters ---------- ncells : int number of time entries in a HeadObservation Returns ------- d : np.recarray """ # get an empty recarray that corresponds to dtype dtype = self._get_dtype() d = create_empty_recarray(ncells, dtype, default_value=-1.0e10) d["obsname"] = "" return d def _get_dtype(self): """ Get the dtype for HeadObservation time_series_data Returns ------- dtype : np.dtype """ # get the default HOB dtype dtype = np.dtype( [ ("totim", np.float32), ("irefsp", int), ("toffset", np.float32), ("hobs", np.float32), ("obsname", "|S12"), ] ) return dtype