import re
import numpy as np
from ..pakbase import Package
from ..utils import MfList
from ..utils.flopy_io import line_parse, pop_item
from ..utils.recarray_utils import create_empty_recarray, recarray
[docs]class ModflowMnw1(Package):
"""
MODFLOW Multi-Node Well 1 Package Class.
Parameters
----------
model : model object
The model object (of type :class:`flopy.modflow.mf.Modflow`) to which
this package will be added.
mxmnw : integer
maximum number of multi-node wells to be simulated
ipakcb : int, optional
Toggles whether cell-by-cell budget data should be saved. If None or zero,
budget data will not be saved (default is None).
iwelpt : integer
verbosity flag
nomoiter : integer
the number of iterations for which flow in MNW wells is calculated
kspref : string
which set of water levels are to be used as reference values for
calculating drawdown
losstype : string
head loss type for each well
wel1_bynode_qsum : list of lists or None
nested list containing file names, unit numbers, and ALLTIME flag for
auxiliary output, e.g. [['test.ByNode',92,'ALLTIME']]
if None, these optional external filenames and unit numbers are not written out
itmp : array
number of wells to be simulated for each stress period (shape : (NPER))
add : bool, list of bool, or None
flag to indicate whether the number of wells specified in itmp should be
added to the number of wells from previous stress periods (True) or if
the number of wells specified in itmp is the total number of wells for
that stress period (False)
lay_row_col_qdes_mn_multi : list of arrays
lay, row, col, qdes, and MN or MULTI flag for all well nodes
(length : NPER)
mnwname : string
prefix name of file for outputting time series data from MNW1
extension : string
Filename extension (default is 'mnw1')
unitnumber : int
File unit number (default is 33).
filenames : string or list of strings
File name of the package (with extension) or a list with the filename
of the package and the cell-by-cell budget file for ipakcb. Default
is None.
Attributes
----------
Methods
-------
See Also
--------
Notes
-----
Parameters are not supported in FloPy.
The functionality of the ADD flag in data set 4 is not supported. Also
not supported are all water-quality parameters (Qwval Iqwgrp), water-level
limitations (Hlim, Href, DD), non-linear well losses, and pumping
limitations (QCUT, Q-%CUT, Qfrcmn, Qfrcmx, DEFAULT).
Examples
--------
>>> import flopy
>>> ml = flopy.modflow.Modflow()
>>> mnw1 = flopy.modflow.ModflowMnw1(ml, ...)
"""
def __init__(
self,
model,
mxmnw=0,
ipakcb=None,
iwelpt=0,
nomoiter=0,
kspref=1,
wel1_bynode_qsum=None,
losstype="skin",
add=None,
stress_period_data=None,
dtype=None,
mnwname=None,
extension="mnw1",
unitnumber=None,
filenames=None,
):
# set default unit number of one is not specified
if unitnumber is None:
unitnumber = ModflowMnw1._defaultunit()
# set filenames
filenames = self._prepare_filenames(filenames, 2)
# cbc output file
self.set_cbc_output_file(ipakcb, model, filenames[1])
# call base package constructor
super().__init__(
model, extension, self._ftype(), unitnumber, filenames=filenames[0]
)
self.url = "mnw.html"
self.nper = self.parent.nrow_ncol_nlay_nper[-1]
self._generate_heading()
self.mxmnw = mxmnw # -maximum number of multi-node wells to be simulated
self.iwelpt = iwelpt # -verbosity flag
# integer indicating the number of iterations for which flow in MNW wells
# is calculated
self.nomoiter = nomoiter
# alphanumeric key indicating which set of water levels are to be used as
# reference values for calculating drawdown
self.kspref = kspref
self.losstype = losstype # -string indicating head loss type for each well
# nested list containing file names, unit numbers, and ALLTIME flag for
# auxiliary output, e.g. [['test.ByNode',92,'ALLTIME']]
self.wel1_bynode_qsum = wel1_bynode_qsum
if add is None:
add = [False] * self.nper
elif isinstance(add, bool):
add = [add] * self.nper
elif isinstance(add, np.ndarray):
add = add.tolist()
self.add = add
if dtype is not None:
self.dtype = dtype
else:
self.dtype = self.get_default_dtype(structured=self.parent.structured)
self.stress_period_data = MfList(self, stress_period_data)
# string prefix name of file for outputting time series data from MNW1
self.mnwname = mnwname
# -input format checks:
lossTypes = ["skin", "linear", "nonlinear"]
assert self.losstype.lower() in lossTypes, (
f"LOSSTYPE ({self.losstype}) must be one of the following: {lossTypes}"
)
self.parent.add_package(self)
[docs] @staticmethod
def get_empty_stress_period_data(itmp, structured=True, default_value=0):
# get an empty recarray that corresponds to dtype
dtype = ModflowMnw1.get_default_dtype(structured=structured)
return create_empty_recarray(itmp, dtype, default_value=default_value)
[docs] @staticmethod
def get_default_dtype(structured=True):
if structured:
return np.dtype(
[
("mnw_no", int),
("k", int),
("i", int),
("j", int),
("qdes", np.float32),
("mntxt", object),
("qwval", np.float32),
("rw", np.float32),
("skin", np.float32),
("hlim", np.float32),
("href", np.float32),
("dd", object),
("iqwgrp", object),
("cpc", object),
("qcut", object),
("qfrcmn", np.float32),
("qfrcmx", np.float32),
("label", object),
]
)
[docs] @classmethod
def load(cls, f, model, nper=None, gwt=False, nsol=1, ext_unit_dict=None):
if model.verbose:
print("loading mnw1 package file...")
structured = model.structured
if nper is None:
nrow, ncol, nlay, nper = model.get_nrow_ncol_nlay_nper()
nper = 1 if nper == 0 else nper
# otherwise iterations from 0, nper won't run
openfile = not hasattr(f, "read")
if openfile:
filename = f
f = open(filename, "r")
# dataset 0 (header)
line = skipcomments(next(f), f)
# dataset 1
mxmnw, ipakcb, iwelpt, nomoiter, kspref = _parse_1(line)
# dataset 2
line = skipcomments(next(f), f)
losstype = _parse_2(line)
# dataset 3
wel1_bynode_qsum = []
line = skipcomments(next(f), f)
for txt in ["wel1", "bynode", "qsum"]:
if txt in line.lower():
wel1_bynode_qsum.append(_parse_3(line, txt))
line = skipcomments(next(f), f)
# dataset 4
line = skipcomments(line, f)
stress_period_data = {}
dtype = ModflowMnw1.get_default_dtype(structured=structured)
qfrcmn_default = None
qfrcmx_default = None
qcut_default = ""
add = []
for per in range(nper):
if per > 0:
line = skipcomments(next(f), f)
itmp = int(line_parse(line)[0])
if itmp < 1:
tadd = False
else:
tadd = True if "add" in line.lower() else False
add.append(tadd)
if itmp > 0:
# dataset 5
data, qfrcmn_default, qfrcmx_default, qcut_default = _parse_5(
f, itmp, qfrcmn_default, qfrcmx_default, qcut_default
)
# cast data (list) to recarray
tmp = recarray(data, dtype)
spd = ModflowMnw1.get_empty_stress_period_data(len(data))
for n in dtype.descr:
spd[n[0]] = tmp[n[0]]
stress_period_data[per] = spd
else:
stress_period_data[per] = ModflowMnw1.get_empty_stress_period_data(0)
if openfile:
f.close()
return cls(
model,
mxmnw=mxmnw,
ipakcb=ipakcb,
iwelpt=iwelpt,
nomoiter=nomoiter,
kspref=kspref,
wel1_bynode_qsum=wel1_bynode_qsum,
losstype=losstype,
add=add,
stress_period_data=stress_period_data,
)
[docs] def write_file(self):
"""
Write the package file.
Returns
-------
None
"""
# -open file for writing
f = open(self.fn_path, "w")
# -write header
f.write(f"{self.heading}\n")
# -Section 1 - MXMNW ipakcb IWELPT NOMOITER REF:kspref
f.write(
"%10i%10i%10i%10i REF = %s\n"
% (self.mxmnw, self.ipakcb, self.iwelpt, self.nomoiter, self.kspref)
)
# -Section 2 - LOSSTYPE {PLossMNW}
f.write(f"{self.losstype}\n")
if self.wel1_bynode_qsum is not None:
# -Section 3a - {FILE:filename WEL1:iunw1}
for each in self.wel1_bynode_qsum:
if each[0].split(".")[1].lower() == "wl1":
f.write("FILE:%s WEL1:%-10i\n" % (each[0], int(each[1])))
# -Section 3b - {FILE:filename BYNODE:iunby} {ALLTIME}
for each in self.wel1_bynode_qsum:
if each[0].split(".")[1].lower() == "bynode":
if len(each) == 2:
f.write("FILE:%s BYNODE:%-10i\n" % (each[0], int(each[1])))
elif len(each) == 3:
f.write(
"FILE:%s BYNODE:%-10i %s\n"
% (each[0], int(each[1]), each[2])
)
# -Section 3C - {FILE:filename QSUM:iunqs} {ALLTIME}
for each in self.wel1_bynode_qsum:
if each[0].split(".")[1].lower() == "qsum":
if len(each) == 2:
f.write("FILE:%s QSUM:%-10i\n" % (each[0], int(each[1])))
elif len(each) == 3:
f.write(
"FILE:%s QSUM:%-10i %s\n" % (each[0], int(each[1]), each[2])
)
# process additional data for Section 4 (itmp and ADD flag)
additional_data = []
for per in range(self.nper):
if self.add[per]:
additional_data.append(" ADD ")
else:
additional_data.append(" ")
spd = self.stress_period_data.drop("mnw_no")
# force write_transient to keep the list arrays internal because MNW1
# doesn't allow open/close
spd.write_transient(f, forceInternal=True, additional_data=additional_data)
# -Un-numbered section PREFIX:MNWNAME
if self.mnwname:
f.write(f"PREFIX:{self.mnwname}\n")
f.close()
@staticmethod
def _ftype():
return "MNW1"
@staticmethod
def _defaultunit():
return 33
def _parse_1(line):
line = line_parse(line)
mnwmax = pop_item(line, int)
ipakcb = pop_item(line, int)
mnwprint = pop_item(line, int)
next_item = line.pop()
nomoiter = 0
kspref = 1
if next_item.isdigit():
nomoiter = int(next_item)
elif "ref" in next_item:
line = " ".join(line)
kspref = re.findall(r"\d+", line)
if len(kspref) > 0:
kspref = int(kspref[0])
return mnwmax, ipakcb, mnwprint, nomoiter, kspref
def _parse_2(line):
line = line.split("!!")[0]
options = ["SKIN", "NONLINEAR", "LINEAR"]
losstype = "skin"
for lt in options:
if lt.lower() in line.lower():
losstype = lt.lower()
return losstype
def _parse_3(line, txt):
def getitem(line, txt):
return line.pop(0).replace(txt + ":", "").strip()
line = line_parse(line.lower())
items = [getitem(line, "file"), getitem(line, txt)]
if "alltime" in " ".join(line):
items.append("alltime")
return items
def _parse_5(f, itmp, qfrcmn_default=None, qfrcmx_default=None, qcut_default=""):
data = []
mnw_no = 0
mn = False
multi = False
label = ""
for n in range(itmp):
linetxt = skipcomments(next(f), f).lower()
line = line_parse(linetxt)
# get the label; strip it out
if "site:" in linetxt:
label = linetxt.replace(",", " ").split("site:")[1].split()[0]
label = "site:" + label
txt = [t for t in line if "site:" in t]
if len(txt) > 0: # site: might have been in the comments section
line.remove(txt[0])
k = pop_item(line, int) - 1
i = pop_item(line, int) - 1
j = pop_item(line, int) - 1
qdes = pop_item(line, float)
# logic to create column of unique numbers for each MNW
mntxt = ""
if "mn" in line:
if not mn:
mnw_no -= 1 # this node has same number as previous
if label == "":
label = data[n - 1][-1]
mn = True
mntxt = "mn"
line.remove("mn")
if "multi" in line:
multi = True
mntxt = "multi"
line.remove("multi")
if mn and not multi:
multi = True
# "The alphanumeric flags MN and DD can appear anywhere
# between columns 41 and 256, inclusive."
dd = ""
if "dd" in line:
line.remove("dd")
dd = "dd"
qwval = pop_item(line, float)
rw = pop_item(line, float)
skin = pop_item(line, float)
hlim = pop_item(line, float)
href = pop_item(line, float)
iqwgrp = pop_item(line)
cpc = ""
if "cp:" in linetxt:
cpc = re.findall(r"\d+", line.pop(0))
# in case there is whitespace between cp: and the value
if len(cpc) == 0:
cpc = pop_item(line)
cpc = "cp:" + cpc
qcut = ""
qfrcmn = 0.0
qfrcmx = 0.0
if "qcut" in linetxt:
txt = next(t for t in line if "qcut" in t)
qcut = txt
line.remove(txt)
elif "%cut" in linetxt:
txt = next(t for t in line if "%cut" in t)
qcut = txt
line.remove(txt)
if "qcut" in linetxt or "%cut" in linetxt:
qfrcmn = pop_item(line, float)
qfrcmx = pop_item(line, float)
elif qfrcmn_default is not None and qfrcmx_default is not None:
qfrcmn = qfrcmn_default
qfrcmx = qfrcmx_default
if "qcut" not in linetxt and "%cut" not in linetxt:
qcut = qcut_default
if "default" in line:
qfrcmn_default = qfrcmn
qfrcmx_default = qfrcmx
qcut_default = qcut
idata = [
mnw_no,
k,
i,
j,
qdes,
mntxt,
qwval,
rw,
skin,
hlim,
href,
dd,
iqwgrp,
cpc,
qcut,
qfrcmn,
qfrcmx,
label,
]
data.append(idata)
# reset MNW designators
# if at the end of the well
if mn and multi:
mnw_no += 1
mn = False
multi = False
label = ""
elif not mn and not multi:
mnw_no += 1
label = ""
return data, qfrcmn_default, qfrcmx_default, qcut_default
def _write_5(f, spd):
f.write("{:d} {:d} {:d} {}")
pass