import copy
import warnings
import matplotlib.colors
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.patches import Polygon
from numpy.lib.recfunctions import stack_arrays
from ..utils import geometry, import_optional_dependency
from ..utils.geospatial_utils import GeoSpatialUtil
from . import plotutil
from .plotutil import to_mp7_endpoints, to_mp7_pathlines
warnings.simplefilter("always", PendingDeprecationWarning)
[docs]class PlotCrossSection:
"""
Class to create a cross sectional plot of a model.
Parameters
----------
ax : matplotlib.pyplot axis
The plot axis. If not provided it, plt.gca() will be used.
model : flopy.modflow object
flopy model object. (Default is None)
modelgrid : flopy.discretization.Grid object
can be a StructuredGrid, VertexGrid, or UnstructuredGrid object
line : dict
Dictionary with either "row", "column", or "line" key. If key
is "row" or "column" key value should be the zero-based row or
column index for cross-section. If key is "line" value should
be an array of (x, y) tuples with vertices of cross-section.
Vertices should be in map coordinates consistent with xul,
yul, and rotation.
extent : tuple of floats
(xmin, xmax, ymin, ymax) will be used to specify axes limits. If None
then these will be calculated based on grid, coordinates, and rotation.
geographic_coords : bool
boolean flag to allow the user to plot cross section lines in
geographic coordinates. If False (default), cross section is plotted
as the distance along the cross section line.
"""
def __init__(
self,
model=None,
modelgrid=None,
ax=None,
line=None,
extent=None,
geographic_coords=False,
):
self.ax = ax
self.geographic_coords = geographic_coords
self.model = model
if modelgrid is not None:
self.mg = modelgrid
elif model is not None:
self.mg = model.modelgrid
else:
raise Exception("Cannot find model grid")
if self.mg.top is None or self.mg.botm is None:
raise AssertionError("modelgrid top and botm must be defined")
if not isinstance(line, dict):
raise AssertionError("A line dictionary must be provided")
line = {k.lower(): v for k, v in line.items()}
if len(line) != 1:
s = (
"only row, column, or line can be specified in line "
"dictionary keys specified: "
)
for k in line.keys():
s += f"{k} "
raise AssertionError(s)
if ax is None:
self.ax = plt.gca()
else:
self.ax = ax
onkey = list(line.keys())[0]
self.__geographic_xpts = None
# un-translate model grid into model coordinates
xcellcenters, ycellcenters = geometry.transform(
self.mg.xcellcenters,
self.mg.ycellcenters,
self.mg.xoffset,
self.mg.yoffset,
self.mg.angrot_radians,
inverse=True,
)
xverts, yverts = self.mg.cross_section_vertices
(
xverts,
yverts,
) = plotutil.UnstructuredPlotUtilities.irregular_shape_patch(
xverts, yverts
)
self.xvertices, self.yvertices = geometry.transform(
xverts,
yverts,
self.mg.xoffset,
self.mg.yoffset,
self.mg.angrot_radians,
inverse=True,
)
if onkey in ("row", "column"):
eps = 1.0e-4
xedge, yedge = self.mg.xyedges
if onkey == "row":
self.direction = "x"
ycenter = ycellcenters.T[0]
pts = [
(xedge[0] - eps, ycenter[int(line[onkey])]),
(xedge[-1] + eps, ycenter[int(line[onkey])]),
]
else:
self.direction = "y"
xcenter = xcellcenters[0, :]
pts = [
(xcenter[int(line[onkey])], yedge[0] + eps),
(xcenter[int(line[onkey])], yedge[-1] - eps),
]
else:
ln = line[onkey]
if not PlotCrossSection._is_valid(ln):
raise ValueError("Invalid line representation")
gu = GeoSpatialUtil(ln, shapetype="linestring")
verts = gu.points
xp = []
yp = []
for [v1, v2] in verts:
xp.append(v1)
yp.append(v2)
xp, yp = self.mg.get_local_coords(xp, yp)
if np.max(xp) - np.min(xp) > np.max(yp) - np.min(yp):
# this is x-projection and we should buffer x by small amount
idx0 = np.argmax(xp)
idx1 = np.argmin(xp)
idx2 = np.argmax(yp)
xp[idx0] += 1e-04
xp[idx1] -= 1e-04
yp[idx2] += 1e-03
self.direction = "x"
else:
# this is y-projection and we should buffer y by small amount
idx0 = np.argmax(yp)
idx1 = np.argmin(yp)
idx2 = np.argmax(xp)
yp[idx0] += 1e-04
yp[idx1] -= 1e-04
xp[idx2] += 1e-03
self.direction = "y"
pts = [(xt, yt) for xt, yt in zip(xp, yp)]
self.pts = np.array(pts)
self.xypts = plotutil.UnstructuredPlotUtilities.line_intersect_grid(
self.pts, self.xvertices, self.yvertices
)
if len(self.xypts) < 2:
if len(list(self.xypts.values())[0]) < 2:
s = (
"cross-section cannot be created\n."
" less than 2 points intersect the model grid\n"
f" {len(self.xypts.values()[0])} points"
" intersect the grid."
)
raise Exception(s)
if self.geographic_coords:
# transform back to geographic coordinates
xypts = {}
for nn, pt in self.xypts.items():
xp = [t[0] for t in pt]
yp = [t[1] for t in pt]
xp, yp = geometry.transform(
xp,
yp,
self.mg.xoffset,
self.mg.yoffset,
self.mg.angrot_radians,
)
xypts[nn] = [(xt, yt) for xt, yt in zip(xp, yp)]
self.xypts = xypts
laycbd = []
self.ncb = 0
if self.model is not None:
if self.model.laycbd is not None:
laycbd = list(self.model.laycbd)
self.ncb = np.count_nonzero(laycbd)
if laycbd:
self.active = []
for k in range(self.mg.nlay):
self.active.append(1)
if laycbd[k] > 0:
self.active.append(0)
self.active = np.array(self.active, dtype=int)
else:
self.active = np.ones(self.mg.nlay, dtype=int)
self._nlay, self._ncpl, self.ncb = self.mg.cross_section_lay_ncpl_ncb(
self.ncb
)
top = self.mg.top.reshape(1, self._ncpl)
botm = self.mg.botm.reshape(self._nlay + self.ncb, self._ncpl)
self.elev = np.concatenate((top, botm), axis=0)
self.idomain = self.mg.idomain
if self.mg.idomain is None:
self.idomain = np.ones(botm.shape, dtype=int)
self.projpts = self.set_zpts(None)
# Create cross-section extent
if extent is None:
self.extent = self.get_extent()
else:
self.extent = extent
# this is actually x or y based on projection
self.xcenters = [
np.mean(np.array(v).T[0]) for i, v in sorted(self.projpts.items())
]
self.mean_dx = np.mean(
np.max(self.xvertices, axis=1) - np.min(self.xvertices, axis=1)
)
self.mean_dy = np.mean(
np.max(self.yvertices, axis=1) - np.min(self.yvertices, axis=1)
)
self._polygons = {}
if model is None:
self._masked_values = [1e30, -1e30]
else:
self._masked_values = [model.hnoflo, model.hdry]
# Set axis limits
self._set_axes_limits(self.ax)
@staticmethod
def _is_valid(line):
shapely_geo = import_optional_dependency("shapely.geometry")
if isinstance(
line,
(
list,
tuple,
np.ndarray,
),
):
a = np.array(line)
if (len(a.shape) < 2 or a.shape[0] < 2) or a.shape[1] != 2:
return False
elif not isinstance(
line,
(
geometry.LineString,
shapely_geo.LineString,
),
):
return False
return True
@property
def polygons(self):
"""
Method to return cached matplotlib polygons for a cross
section
Returns
-------
dict : [matplotlib.patches.Polygon]
"""
if not self._polygons:
for cell, poly in self.projpts.items():
if len(poly) > 4:
# this is the rare multipolygon instance...
n = 0
p = []
polys = []
for vn, v in enumerate(poly):
if vn == 3 + 4 * n:
n += 1
p.append(v)
polys.append(p)
p = []
else:
p.append(v)
else:
polys = [poly]
for polygon in polys:
verts = plotutil.UnstructuredPlotUtilities.arctan2(
np.array(polygon)
)
if cell not in self._polygons:
self._polygons[cell] = [Polygon(verts, closed=True)]
else:
self._polygons[cell].append(
Polygon(verts, closed=True)
)
return copy.copy(self._polygons)
[docs] def get_extent(self):
"""
Get the extent of the rotated and offset grid
Returns
-------
tuple : (xmin, xmax, ymin, ymax)
"""
xpts = []
for _, verts in self.projpts.items():
for v in verts:
xpts.append(v[0])
xmin = np.min(xpts)
xmax = np.max(xpts)
ymin = np.min(self.elev)
ymax = np.max(self.elev)
return xmin, xmax, ymin, ymax
def _set_axes_limits(self, ax):
"""
Internal method to set axes limits
Parameters
----------
ax : matplotlib.pyplot axis
The plot axis
Returns
-------
ax : matplotlib.pyplot axis object
"""
if ax.get_autoscale_on():
ax.set_xlim(self.extent[0], self.extent[1])
ax.set_ylim(self.extent[2], self.extent[3])
return ax
[docs] def plot_array(self, a, masked_values=None, head=None, **kwargs):
"""
Plot a three-dimensional array as a patch collection.
Parameters
----------
a : numpy.ndarray
Three-dimensional array to plot.
masked_values : iterable of floats, ints
Values to mask.
head : numpy.ndarray
Three-dimensional array to set top of patches to the minimum
of the top of a layer or the head value. Used to create
patches that conform to water-level elevations.
**kwargs : dictionary
keyword arguments passed to matplotlib.collections.PatchCollection
Returns
-------
patches : matplotlib.collections.PatchCollection
"""
ax = kwargs.pop("ax", self.ax)
if not isinstance(a, np.ndarray):
a = np.array(a)
if a.ndim > 1:
a = np.ravel(a)
a = a.astype(float)
if masked_values is not None:
self._masked_values.extend(list(masked_values))
for mval in self._masked_values:
a = np.ma.masked_values(a, mval)
if isinstance(head, np.ndarray):
projpts = self.set_zpts(np.ravel(head))
else:
projpts = None
pc = self.get_grid_patch_collection(a, projpts, **kwargs)
if pc is not None:
ax.add_collection(pc)
ax = self._set_axes_limits(ax)
return pc
[docs] def plot_surface(self, a, masked_values=None, **kwargs):
"""
Plot a two- or three-dimensional array as line(s).
Parameters
----------
a : numpy.ndarray
Two- or three-dimensional array to plot.
masked_values : iterable of floats, ints
Values to mask.
**kwargs : dictionary
keyword arguments passed to matplotlib.pyplot.plot
Returns
-------
plot : list containing matplotlib.plot objects
"""
ax = kwargs.pop("ax", self.ax)
color = kwargs.pop("color", "b")
color = kwargs.pop("c", color)
if not isinstance(a, np.ndarray):
a = np.array(a)
if a.ndim > 1:
a = np.ravel(a)
a = a.astype(float)
if a.size % self._ncpl != 0:
raise AssertionError("Array size must be a multiple of ncpl")
if masked_values is not None:
self._masked_values.extend(list(masked_values))
for mval in self._masked_values:
a = np.ma.masked_values(a, mval)
d = {
i: (np.min(np.array(v).T[0]), np.max(np.array(v).T[0]))
for i, v in sorted(self.projpts.items())
}
surface = []
for cell, val in d.items():
if cell >= a.size:
continue
elif np.isnan(a[cell]):
continue
elif a[cell] is np.ma.masked:
continue
else:
line = ax.plot(
d[cell], [a[cell], a[cell]], color=color, **kwargs
)
surface.append(line)
ax = self._set_axes_limits(ax)
return surface
[docs] def plot_fill_between(
self,
a,
colors=("blue", "red"),
masked_values=None,
head=None,
**kwargs,
):
"""
Plot a three-dimensional array as lines.
Parameters
----------
a : numpy.ndarray
Three-dimensional array to plot.
colors : list
matplotlib fill colors, two required
masked_values : iterable of floats, ints
Values to mask.
head : numpy.ndarray
Three-dimensional array to set top of patches to the minimum
of the top of a layer or the head value. Used to create
patches that conform to water-level elevations.
**kwargs : dictionary
keyword arguments passed to matplotlib.pyplot.plot
Returns
-------
plot : list containing matplotlib.fillbetween objects
"""
ax = kwargs.pop("ax", self.ax)
kwargs["colors"] = colors
if not isinstance(a, np.ndarray):
a = np.array(a)
a = np.ravel(a).astype(float)
if masked_values is not None:
self._masked_values.extend(list(masked_values))
for mval in self._masked_values:
a = np.ma.masked_values(a, mval)
if isinstance(head, np.ndarray):
projpts = self.set_zpts(head)
else:
projpts = self.projpts
pc = self.get_grid_patch_collection(
a, projpts, fill_between=True, **kwargs
)
if pc is not None:
ax.add_collection(pc)
ax = self._set_axes_limits(ax)
return pc
[docs] def contour_array(self, a, masked_values=None, head=None, **kwargs):
"""
Contour a two-dimensional array.
Parameters
----------
a : numpy.ndarray
Three-dimensional array to plot.
masked_values : iterable of floats, ints
Values to mask.
head : numpy.ndarray
Three-dimensional array to set top of patches to the minimum
of the top of a layer or the head value. Used to create
patches that conform to water-level elevations.
**kwargs : dictionary
keyword arguments passed to matplotlib.pyplot.contour
Returns
-------
contour_set : matplotlib.pyplot.contour
"""
import matplotlib.tri as tri
if not isinstance(a, np.ndarray):
a = np.array(a)
if a.ndim > 1:
a = np.ravel(a)
ax = kwargs.pop("ax", self.ax)
xcenters = self.xcenters
plotarray = np.array([a[cell] for cell in sorted(self.projpts)])
(
plotarray,
xcenters,
zcenters,
mplcontour,
) = self.mg.cross_section_set_contour_arrays(
plotarray, xcenters, head, self.elev, self.projpts
)
if not mplcontour:
if isinstance(head, np.ndarray):
zcenters = self.set_zcentergrid(np.ravel(head))
else:
zcenters = np.array(
[
np.mean(np.array(v).T[1])
for i, v in sorted(self.projpts.items())
]
)
# work around for tri-contour ignore vmin & vmax
# necessary for the tri-contour NaN issue fix
if "levels" not in kwargs:
vmin = kwargs.pop("vmin", np.nanmin(plotarray))
vmax = kwargs.pop("vmax", np.nanmax(plotarray))
levels = np.linspace(vmin, vmax, 7)
kwargs["levels"] = levels
# workaround for tri-contour nan issue
plotarray[np.isnan(plotarray)] = -(2**31)
if masked_values is None:
masked_values = [-(2**31)]
else:
masked_values = list(masked_values)
if -(2**31) not in masked_values:
masked_values.append(-(2**31))
ismasked = None
if masked_values is not None:
self._masked_values.extend(list(masked_values))
for mval in self._masked_values:
if ismasked is None:
ismasked = np.isclose(plotarray, mval)
else:
t = np.isclose(plotarray, mval)
ismasked += t
filled = kwargs.pop("filled", False)
plot_triplot = kwargs.pop("plot_triplot", False)
if "extent" in kwargs:
extent = kwargs.pop("extent")
idx = (
(xcenters >= extent[0])
& (xcenters <= extent[1])
& (zcenters >= extent[2])
& (zcenters <= extent[3])
)
plotarray = plotarray[idx].flatten()
xcenters = xcenters[idx].flatten()
zcenters = zcenters[idx].flatten()
if mplcontour:
plotarray = np.ma.masked_array(plotarray, ismasked)
if filled:
contour_set = ax.contourf(
xcenters, zcenters, plotarray, **kwargs
)
else:
contour_set = ax.contour(
xcenters, zcenters, plotarray, **kwargs
)
else:
triang = tri.Triangulation(xcenters, zcenters)
analyze = tri.TriAnalyzer(triang)
mask = analyze.get_flat_tri_mask(rescale=False)
if ismasked is not None:
ismasked = ismasked.flatten()
mask2 = np.any(
np.where(ismasked[triang.triangles], True, False), axis=1
)
mask[mask2] = True
triang.set_mask(mask)
if filled:
contour_set = ax.tricontourf(triang, plotarray, **kwargs)
else:
contour_set = ax.tricontour(triang, plotarray, **kwargs)
if plot_triplot:
ax.triplot(triang, color="black", marker="o", lw=0.75)
ax = self._set_axes_limits(ax)
return contour_set
[docs] def plot_inactive(self, ibound=None, color_noflow="black", **kwargs):
"""
Make a plot of inactive cells. If not specified, then pull ibound
from the self.ml
Parameters
----------
ibound : numpy.ndarray
ibound array to plot. (Default is ibound in 'BAS6' package.)
color_noflow : string
(Default is 'black')
Returns
-------
quadmesh : matplotlib.collections.QuadMesh
"""
if ibound is None:
if self.mg.idomain is None:
raise AssertionError("An idomain array must be provided")
else:
ibound = self.mg.idomain
plotarray = np.zeros(ibound.shape, dtype=int)
idx1 = ibound == 0
plotarray[idx1] = 1
plotarray = np.ma.masked_equal(plotarray, 0)
cmap = matplotlib.colors.ListedColormap(["0", color_noflow])
bounds = [0, 1, 2]
norm = matplotlib.colors.BoundaryNorm(bounds, cmap.N)
patches = self.plot_array(plotarray, cmap=cmap, norm=norm, **kwargs)
return patches
[docs] def plot_ibound(
self,
ibound=None,
color_noflow="black",
color_ch="blue",
color_vpt="red",
head=None,
**kwargs,
):
"""
Make a plot of ibound. If not specified, then pull ibound from the
self.model
Parameters
----------
ibound : numpy.ndarray
ibound array to plot. (Default is ibound in 'BAS6' package.)
color_noflow : string
(Default is 'black')
color_ch : string
Color for constant heads (Default is 'blue'.)
head : numpy.ndarray
Three-dimensional array to set top of patches to the minimum
of the top of a layer or the head value. Used to create
patches that conform to water-level elevations.
**kwargs : dictionary
keyword arguments passed to matplotlib.collections.PatchCollection
Returns
-------
patches : matplotlib.collections.PatchCollection
"""
if ibound is None:
if self.model is not None:
if self.model.version == "mf6":
color_ch = color_vpt
if self.mg.idomain is None:
raise AssertionError("Ibound/Idomain array must be provided")
ibound = self.mg.idomain
plotarray = np.zeros(ibound.shape, dtype=int)
idx1 = ibound == 0
idx2 = ibound < 0
plotarray[idx1] = 1
plotarray[idx2] = 2
plotarray = np.ma.masked_equal(plotarray, 0)
cmap = matplotlib.colors.ListedColormap(
["none", color_noflow, color_ch]
)
bounds = [0, 1, 2, 3]
norm = matplotlib.colors.BoundaryNorm(bounds, cmap.N)
# mask active cells
patches = self.plot_array(
plotarray,
masked_values=[0],
head=head,
cmap=cmap,
norm=norm,
**kwargs,
)
return patches
[docs] def plot_grid(self, **kwargs):
"""
Plot the grid lines.
Parameters
----------
kwargs : ax, colors. The remaining kwargs are passed into the
the LineCollection constructor.
Returns
-------
lc : matplotlib.collections.LineCollection
"""
ax = kwargs.pop("ax", self.ax)
col = self.get_grid_line_collection(**kwargs)
if col is not None:
ax.add_collection(col)
# ax.set_xlim(self.extent[0], self.extent[1])
# ax.set_ylim(self.extent[2], self.extent[3])
return col
[docs] def plot_bc(
self, name=None, package=None, kper=0, color=None, head=None, **kwargs
):
"""
Plot boundary conditions locations for a specific boundary
type from a flopy model
Parameters
----------
name : string
Package name string ('WEL', 'GHB', etc.). (Default is None)
package : flopy.modflow.Modflow package class instance
flopy package class instance. (Default is None)
kper : int
Stress period to plot
color : string
matplotlib color string. (Default is None)
head : numpy.ndarray
Three-dimensional array (structured grid) or
Two-dimensional array (vertex grid)
to set top of patches to the minimum of the top of a\
layer or the head value. Used to create
patches that conform to water-level elevations.
**kwargs : dictionary
keyword arguments passed to matplotlib.collections.PatchCollection
Returns
-------
patches : matplotlib.collections.PatchCollection
"""
if "ftype" in kwargs and name is None:
name = kwargs.pop("ftype")
# Find package to plot
if package is not None:
p = package
elif self.model is not None:
if name is None:
raise Exception("ftype not specified")
name = name.upper()
p = self.model.get_package(name)
else:
raise Exception("Cannot find package to plot")
# trap for mf6 'cellid' vs mf2005 'k', 'i', 'j' convention
if isinstance(p, list) or p.parent.version == "mf6":
if not isinstance(p, list):
p = [p]
idx = np.array([])
for pp in p:
if pp.package_type in ("lak", "sfr", "maw", "uzf"):
t = plotutil.advanced_package_bc_helper(pp, self.mg, kper)
else:
try:
mflist = pp.stress_period_data.array[kper]
except Exception as e:
raise Exception(
f"Not a list-style boundary package: {e!s}"
)
if mflist is None:
return
t = np.array(
[list(i) for i in mflist["cellid"]], dtype=int
).T
if len(idx) == 0:
idx = np.copy(t)
else:
idx = np.append(idx, t, axis=1)
else:
# modflow-2005 structured and unstructured grid
if p.package_type in ("uzf", "lak"):
idx = plotutil.advanced_package_bc_helper(p, self.mg, kper)
else:
try:
mflist = p.stress_period_data[kper]
except Exception as e:
raise Exception(
f"Not a list-style boundary package: {e!s}"
)
if mflist is None:
return
if len(self.mg.shape) == 3:
idx = [mflist["k"], mflist["i"], mflist["j"]]
else:
idx = mflist["node"]
if len(self.mg.shape) == 3:
plotarray = np.zeros(
(self.mg.nlay, self.mg.nrow, self.mg.ncol), dtype=int
)
plotarray[idx[0], idx[1], idx[2]] = 1
elif len(self.mg.shape) == 2:
plotarray = np.zeros((self._nlay, self._ncpl), dtype=int)
plotarray[tuple(idx)] = 1
else:
plotarray = np.zeros(self._ncpl, dtype=int)
idx = idx.flatten()
plotarray[idx] = 1
plotarray = np.ma.masked_equal(plotarray, 0)
if color is None:
key = name[:3].upper()
if key in plotutil.bc_color_dict:
c = plotutil.bc_color_dict[key]
else:
c = plotutil.bc_color_dict["default"]
else:
c = color
cmap = matplotlib.colors.ListedColormap(["none", c])
bounds = [0, 1, 2]
norm = matplotlib.colors.BoundaryNorm(bounds, cmap.N)
patches = self.plot_array(
plotarray,
masked_values=[0],
head=head,
cmap=cmap,
norm=norm,
**kwargs,
)
return patches
[docs] def plot_vector(
self,
vx,
vy,
vz,
head=None,
kstep=1,
hstep=1,
normalize=False,
masked_values=None,
**kwargs,
):
"""
Plot a vector.
Parameters
----------
vx : np.ndarray
x component of the vector to be plotted (non-rotated)
array shape must be (nlay, nrow, ncol) for a structured grid
array shape must be (nlay, ncpl) for a unstructured grid
vy : np.ndarray
y component of the vector to be plotted (non-rotated)
array shape must be (nlay, nrow, ncol) for a structured grid
array shape must be (nlay, ncpl) for a unstructured grid
vz : np.ndarray
y component of the vector to be plotted (non-rotated)
array shape must be (nlay, nrow, ncol) for a structured grid
array shape must be (nlay, ncpl) for a unstructured grid
head : numpy.ndarray
MODFLOW's head array. If not provided, then the quivers will be
plotted in the cell center.
kstep : int
layer frequency to plot (default is 1)
hstep : int
horizontal frequency to plot (default is 1)
normalize : bool
boolean flag used to determine if vectors should be normalized
using the vector magnitude in each cell (default is False)
masked_values : iterable of floats
values to mask
kwargs : matplotlib.pyplot keyword arguments for the
plt.quiver method
Returns
-------
quiver : matplotlib.pyplot.quiver
result of the quiver function
"""
ax = kwargs.pop("ax", self.ax)
pivot = kwargs.pop("pivot", "middle")
# Check that the cross section is not arbitrary with a tolerance
# of the mean cell size in each direction
arbitrary = False
pts = self.pts
xuniform = [
True if abs(pts.T[0, 0] - i) < self.mean_dy else False
for i in pts.T[0]
]
yuniform = [
True if abs(pts.T[1, 0] - i) < self.mean_dx else False
for i in pts.T[1]
]
if not np.all(xuniform) and not np.all(yuniform):
arbitrary = True
if arbitrary:
err_msg = (
"plot_specific_discharge() does not "
"support arbitrary cross-sections"
)
raise AssertionError(err_msg)
# get ibound array to mask inactive cells
ib = np.ones((self.mg.nnodes,), dtype=int)
if self.mg.idomain is not None:
ib = self.mg.idomain.ravel()
# get the actual values to plot and set xcenters
if self.direction == "x":
u_tmp = vx
else:
u_tmp = vy * -1.0
# kstep implementation for vertex grid
projpts = {
key: value
for key, value in self.projpts.items()
if (key // self._ncpl) % kstep == 0
}
# set x and z centers
if isinstance(head, np.ndarray):
# pipe kstep to set_zcentergrid to assure consistent array size
zcenters = self.set_zcentergrid(np.ravel(head), kstep=kstep)
else:
zcenters = [
np.mean(np.array(v).T[1]) for i, v in sorted(projpts.items())
]
xcenters = np.array(
[np.mean(np.array(v).T[0]) for i, v in sorted(projpts.items())]
)
x = np.ravel(xcenters)
z = np.ravel(zcenters)
u = np.array([u_tmp.ravel()[cell] for cell in sorted(projpts)])
v = np.array([vz.ravel()[cell] for cell in sorted(projpts)])
ib = np.array([ib[cell] for cell in sorted(projpts)])
x = x[::hstep]
z = z[::hstep]
u = u[::hstep]
v = v[::hstep]
ib = ib[::hstep]
# mask values
if masked_values is not None:
for mval in masked_values:
to_mask = np.logical_or(u == mval, v == mval)
u[to_mask] = np.nan
v[to_mask] = np.nan
# normalize
if normalize:
vmag = np.sqrt(u**2.0 + v**2.0)
idx = vmag > 0.0
u[idx] /= vmag[idx]
v[idx] /= vmag[idx]
# mask with an ibound array
u[ib == 0] = np.nan
v[ib == 0] = np.nan
# plot with quiver
quiver = ax.quiver(x, z, u, v, pivot=pivot, **kwargs)
return quiver
[docs] def plot_pathline(
self, pl, travel_time=None, method="cell", head=None, **kwargs
):
"""
Plot particle pathlines. Compatible with MODFLOW 6 PRT particle track
data format, or MODPATH 6 or 7 pathline data format.
Parameters
----------
pl : list of recarrays or dataframes, or a single recarray or dataframe
Particle pathline data. If a list of recarrays or dataframes,
each must contain the path of only a single particle. If just
one recarray or dataframe, it should contain the paths of all
particles. The flopy.utils.modpathfile.PathlineFile.get_data()
or get_alldata() return value may be passed directly as this
argument.
For MODPATH 6 or 7 pathlines, columns must include 'x', 'y', 'z',
'time', 'k', and 'particleid'. Additional columns are ignored.
For MODFLOW 6 PRT pathlines, columns must include 'x', 'y', 'z',
't', 'trelease', 'imdl', 'iprp', 'irpt', and 'ilay'. Additional
columns are ignored. Note that MODFLOW 6 PRT does not assign to
particles a unique ID, but infers particle identity from 'imdl',
'iprp', 'irpt', and 'trelease' combos (i.e. via composite key).
travel_time : float or str
travel_time is a travel time selection for the displayed
pathlines. If a float is passed then pathlines with times
less than or equal to the passed time are plotted. If a
string is passed a variety logical constraints can be added
in front of a time value to select pathlines for a select
period of time. Valid logical constraints are <=, <, ==, >=, and
>. For example, to select all pathlines less than 10000 days
travel_time='< 10000' would be passed to plot_pathline.
(default is None)
method : str
"cell" shows only pathlines that intersect with a cell
"all" projects all pathlines onto the cross section regardless
of whether they intersect with a given cell
head : np.ndarray
optional adjustment to only show pathlines that are <= to
the top of the water table given a user supplied head array
kwargs : layer, ax, colors. The remaining kwargs are passed
into the LineCollection constructor.
Returns
-------
lc : matplotlib.collections.LineCollection
The pathlines added to the plot.
"""
from matplotlib.collections import LineCollection
# make sure pl is a list
if not isinstance(pl, list):
if not isinstance(pl, (np.ndarray, pd.DataFrame)):
raise TypeError(
"Pathline data must be a list of recarrays or dataframes, "
f"or a single recarray or dataframe, got {type(pl)}"
)
pl = [pl]
# convert prt to mp7 format
pl = [
to_mp7_pathlines(
p.to_records(index=False) if isinstance(p, pd.DataFrame) else p
)
for p in pl
]
# merge pathlines then split on particleid
pls = stack_arrays(pl, asrecarray=True, usemask=False)
pids = np.unique(pls["particleid"])
pl = [pls[pls["particleid"] == pid] for pid in pids]
# configure plot settings
marker = kwargs.pop("marker", None)
markersize = kwargs.pop("markersize", None)
markersize = kwargs.pop("ms", markersize)
markercolor = kwargs.pop("markercolor", None)
markerevery = kwargs.pop("markerevery", 1)
ax = kwargs.pop("ax", self.ax)
if "colors" not in kwargs:
kwargs["colors"] = "0.5"
projpts = self.projpts
if head is not None:
projpts = self.set_zpts(head)
pl2 = []
for p in pl:
tp = plotutil.filter_modpath_by_travel_time(p, travel_time)
pl2.append(tp)
tp = plotutil.intersect_modpath_with_crosssection(
pl2,
projpts,
self.xvertices,
self.yvertices,
self.direction,
self._ncpl,
method=method,
)
plines = plotutil.reproject_modpath_to_crosssection(
tp,
projpts,
self.xypts,
self.direction,
self.mg,
self._ncpl,
self.geographic_coords,
)
# build linecollection and markers arrays
linecol = []
markers = []
for _, arr in plines.items():
arr = np.array(arr)
# sort by travel time
arr = arr[arr[:, -1].argsort()]
linecol.append(arr[:, :-1])
if marker is not None:
for xy in arr[::markerevery]:
markers.append(xy)
lc = None
if len(linecol) > 0:
lc = LineCollection(linecol, **kwargs)
ax.add_collection(lc)
if marker is not None:
markers = np.array(markers)
ax.plot(
markers[:, 0],
markers[:, 1],
lw=0,
marker=marker,
color=markercolor,
ms=markersize,
)
return lc
[docs] def plot_timeseries(
self, ts, travel_time=None, method="cell", head=None, **kwargs
):
"""
Plot the MODPATH timeseries. Not compatible with MODFLOW 6 PRT.
Parameters
----------
ts : list of rec arrays or a single rec array
rec array or list of rec arrays is data returned from
modpathfile TimeseriesFile get_data() or get_alldata()
methods. Data in rec array is 'x', 'y', 'z', 'time',
'k', and 'particleid'.
travel_time : float or str
travel_time is a travel time selection for the displayed
pathlines. If a float is passed then pathlines with times
less than or equal to the passed time are plotted. If a
string is passed a variety logical constraints can be added
in front of a time value to select pathlines for a select
period of time. Valid logical constraints are <=, <, ==, >=, and
>. For example, to select all pathlines less than 10000 days
travel_time='< 10000' would be passed to plot_pathline.
(default is None)
kwargs : layer, ax, colors. The remaining kwargs are passed
into the LineCollection constructor. If layer='all',
pathlines are output for all layers
Returns
-------
lo : list of Line2D objects
"""
if "color" in kwargs:
kwargs["markercolor"] = kwargs["color"]
return self.plot_pathline(
ts, travel_time=travel_time, method=method, head=head, **kwargs
)
[docs] def plot_endpoint(
self,
ep,
direction="ending",
selection=None,
selection_direction=None,
method="cell",
head=None,
**kwargs,
):
"""
Plot particle endpoints. Compatible with MODFLOW 6 PRT particle
track data format, or MODPATH 6 or 7 endpoint data format.
Parameters
----------
ep : recarray or dataframe
A numpy recarray with the endpoint particle data from the
MODPATH endpoint file.
For MODFLOW 6 PRT pathlines, columns must include 'x', 'y', 'z',
't', 'trelease', 'imdl', 'iprp', 'irpt', and 'ilay'. Additional
columns are ignored. Note that MODFLOW 6 PRT does not assign to
particles a unique ID, but infers particle identity from 'imdl',
'iprp', 'irpt', and 'trelease' combos (i.e. via composite key).
direction : str
String defining if starting or ending particle locations should be
considered. (default is 'ending')
selection : tuple
tuple that defines the zero-base layer, row, column location
(l, r, c) to use to make a selection of particle endpoints.
The selection could be a well location to determine capture zone
for the well. If selection is None, all particle endpoints for
the user-sepcified direction will be plotted. (default is None)
selection_direction : str
String defining is a selection should be made on starting or
ending particle locations. If selection is not None and
selection_direction is None, the selection direction will be set
to the opposite of direction. (default is None)
kwargs : ax, c, s or size, colorbar, colorbar_label, shrink. The
remaining kwargs are passed into the matplotlib scatter
method. If colorbar is True a colorbar will be added to the plot.
If colorbar_label is passed in and colorbar is True then
colorbar_label will be passed to the colorbar set_label()
method. If shrink is passed in and colorbar is True then
the colorbar size will be set using shrink.
Returns
-------
sp : matplotlib.collections.PathCollection
The PathCollection added to the plot.
"""
# convert ep to recarray if needed
if isinstance(ep, pd.DataFrame):
ep = ep.to_records(index=False)
# convert ep from prt to mp7 format if needed
if "t" in ep.dtype.names:
from .plotutil import to_mp7_endpoints
ep = to_mp7_endpoints(ep)
# configure plot settings
ax = kwargs.pop("ax", self.ax)
createcb = kwargs.pop("colorbar", False)
colorbar_label = kwargs.pop("colorbar_label", "Endpoint Time")
shrink = float(kwargs.pop("shrink", 1.0))
s = kwargs.pop("s", np.sqrt(50))
s = float(kwargs.pop("size", s)) ** 2.0
cd = {}
if "c" not in kwargs:
vmin, vmax = 1e10, -1e10
for rec in ep:
tt = float(rec["time"] - rec["time0"])
if tt < vmin:
vmin = tt
if tt > vmax:
vmax = tt
cd[int(rec["particleid"])] = tt
kwargs["vmin"] = vmin
kwargs["vmax"] = vmax
else:
tc = kwargs.pop("c")
for rec in ep:
cd[int(rec["praticleid"])] = tc
tep, istart = plotutil.parse_modpath_selection_options(
ep, direction, selection, selection_direction
)[0:2]
projpts = self.projpts
if head is not None:
projpts = self.set_zpts(head)
tep = plotutil.intersect_modpath_with_crosssection(
tep,
projpts,
self.xvertices,
self.yvertices,
self.direction,
method=method,
starting=istart,
)
if not tep:
return
epdict = plotutil.reproject_modpath_to_crosssection(
tep,
projpts,
self.xypts,
self.direction,
self.mg,
self.geographic_coords,
starting=istart,
)
arr = []
c = []
for node, epl in sorted(epdict.items()):
c.append(cd[node])
for xy in epl:
arr.append(xy)
arr = np.array(arr)
sp = ax.scatter(arr[:, 0], arr[:, 1], c=c, s=s, **kwargs)
# add a colorbar for travel times
if createcb:
cb = plt.colorbar(sp, ax=ax, shrink=shrink)
cb.set_label(colorbar_label)
return sp
[docs] def get_grid_line_collection(self, **kwargs):
"""
Get a PatchCollection of the grid
Parameters
----------
**kwargs : dictionary
keyword arguments passed to matplotlib.collections.LineCollection
Returns
-------
PatchCollection : matplotlib.collections.LineCollection
"""
from matplotlib.collections import PatchCollection
edgecolor = kwargs.pop("colors", "grey")
edgecolor = kwargs.pop("color", edgecolor)
edgecolor = kwargs.pop("ec", edgecolor)
edgecolor = kwargs.pop("edgecolor", edgecolor)
facecolor = kwargs.pop("facecolor", "none")
facecolor = kwargs.pop("fc", facecolor)
polygons = [
p for _, polys in sorted(self.polygons.items()) for p in polys
]
if len(polygons) > 0:
patches = PatchCollection(
polygons, edgecolor=edgecolor, facecolor=facecolor, **kwargs
)
else:
patches = None
return patches
[docs] def set_zpts(self, vs):
"""
Get an array of projected vertices corrected with corrected
elevations based on minimum of cell elevation (self.elev) or
passed vs numpy.ndarray
Parameters
----------
vs : numpy.ndarray
Two-dimensional array to plot.
Returns
-------
zpts : dict
"""
# make vertex array based on projection direction
if vs is not None:
if not isinstance(vs, np.ndarray):
vs = np.array(vs)
if self.direction == "x":
xyix = 0
else:
xyix = -1
projpts = {}
nlay = self.mg.nlay + self.ncb
nodeskip = self.mg.cross_section_nodeskip(nlay, self.xypts)
cbcnt = 0
for k in range(1, nlay + 1):
if not self.active[k - 1]:
cbcnt += 1
continue
k, ns, ncbnn = self.mg.cross_section_adjust_indicies(k - 1, cbcnt)
top = self.elev[k - 1, :]
botm = self.elev[k, :]
d0 = 0
# trap to split multipolygons
xypts = []
for nn, verts in self.xypts.items():
if nn in nodeskip[ns - 1]:
continue
if len(verts) > 2:
i0 = 2
for ix in range(len(verts)):
if ix == i0 - 1:
xypts.append((nn, verts[i0 - 2 : i0]))
i0 += 2
else:
xypts.append((nn, verts))
xypts = sorted(xypts, key=lambda q: q[-1][xyix][xyix])
if self.direction == "y":
xypts = xypts[::-1]
for nn, verts in xypts:
if vs is None:
t = top[nn]
else:
t = vs[nn + ncbnn]
if np.isclose(t, -1e30):
t = botm[nn]
if t < botm[nn]:
t = botm[nn]
if top[nn] < t:
t = top[nn]
b = botm[nn]
if self.geographic_coords:
if self.direction == "x":
projt = [(v[0], t) for v in verts]
projb = [(v[0], b) for v in verts]
else:
projt = [(v[1], t) for v in verts]
projb = [(v[1], b) for v in verts]
else:
verts = np.array(verts).T
a2 = (np.max(verts[0]) - np.min(verts[0])) ** 2
b2 = (np.max(verts[1]) - np.min(verts[1])) ** 2
c = np.sqrt(a2 + b2)
d1 = d0 + c
projt = [(d0, t), (d1, t)]
projb = [(d0, b), (d1, b)]
d0 += c
projpt = projt + projb
node = nn + ncbnn
if node not in projpts:
projpts[node] = projpt
else:
projpts[node] += projpt
return projpts
[docs] def set_zcentergrid(self, vs, kstep=1):
"""
Get an array of z elevations at the center of a cell that is based
on minimum of cell top elevation (self.elev) or passed vs numpy.ndarray
Parameters
----------
vs : numpy.ndarray
Three-dimensional array to plot.
kstep : int
plotting layer interval
Returns
-------
zcentergrid : numpy.ndarray
"""
verts = self.set_zpts(vs)
zcenters = [
np.mean(np.array(v).T[1])
for i, v in sorted(verts.items())
if (i // self._ncpl) % kstep == 0
]
return zcenters
[docs] def get_grid_patch_collection(
self, plotarray, projpts=None, fill_between=False, **kwargs
):
"""
Get a PatchCollection of plotarray in unmasked cells
Parameters
----------
plotarray : numpy.ndarray
One-dimensional array to attach to the Patch Collection.
projpts : dict
dictionary defined by node number which contains model
patch vertices.
fill_between : bool
flag to create polygons that mimic the matplotlib fill between
method. Only used by the plot_fill_between method.
**kwargs : dictionary
keyword arguments passed to matplotlib.collections.PatchCollection
Returns
-------
patches : matplotlib.collections.PatchCollection
"""
from matplotlib.collections import PatchCollection
from matplotlib.patches import Polygon
use_cache = False
if projpts is None:
use_cache = True
projpts = self.polygons
vmin = kwargs.pop("vmin", None)
vmax = kwargs.pop("vmax", None)
match_original = False
if fill_between:
match_original = True
colors = kwargs.pop("colors")
rectcol = []
data = []
for cell, poly in sorted(projpts.items()):
if not use_cache:
if len(poly) > 4:
# multipolygon instance...
n = 0
p = []
polys = []
for vn, v in enumerate(poly):
if vn == 3 + 4 * n:
n += 1
p.append(v)
polys.append(p)
p = []
else:
p.append(v)
else:
polys = [poly]
else:
polys = poly
for polygon in polys:
if not use_cache:
polygon = plotutil.UnstructuredPlotUtilities.arctan2(
np.array(polygon)
)
if np.isnan(plotarray[cell]):
continue
elif plotarray[cell] is np.ma.masked:
continue
if use_cache:
rectcol.append(polygon)
elif fill_between:
x = list(set(np.array(polygon).T[0]))
y1 = np.max(np.array(polygon).T[1])
y = np.min(np.array(polygon).T[1])
v = plotarray[cell]
if v > y1:
v = y
if v < y:
v = y
p1 = [(x[0], y1), (x[1], y1), (x[1], v), (x[0], v)]
p2 = [(x[0], v), (x[1], v), (x[1], y), (x[0], y)]
rectcol.append(Polygon(p1, closed=True, color=colors[0]))
rectcol.append(Polygon(p2, closed=True, color=colors[1]))
else:
rectcol.append(Polygon(polygon, closed=True))
data.append(plotarray[cell])
if len(rectcol) > 0:
patches = PatchCollection(
rectcol, match_original=match_original, **kwargs
)
if not fill_between:
patches.set_array(np.array(data))
patches.set_clim(vmin, vmax)
else:
patches = None
return patches