"""
Support for MODPATH output files.
"""
import itertools
import os
from typing import List, Optional, Tuple, Union
import numpy as np
from numpy.lib.recfunctions import append_fields, repack_fields
from flopy.utils.particletrackfile import ParticleTrackFile
from ..utils.flopy_io import loadtxt
[docs]class ModpathFile(ParticleTrackFile):
"""Provides MODPATH output file support."""
def __init__(
self, filename: Union[str, os.PathLike], verbose: bool = False
):
super().__init__(filename, verbose)
self.output_type = self.__class__.__name__.lower().replace("file", "")
(
self.modpath,
self.compact,
self.skiprows,
self.version,
self.direction,
) = self.parse(filename, self.output_type)
[docs] @staticmethod
def parse(
file_path: Union[str, os.PathLike], file_type: str
) -> Tuple[bool, int, int, Optional[int]]:
"""
Extract preliminary information from a MODPATH output file:
- whether in compact format
- how many rows to skip
- the MODPATH version
Parameters
----------
file_path : str or PathLike
The output file path
file_type : str
The output file type: pathline, endpoint, or timeseries
Returns
-------
out : bool, int, int
A tuple (compact, skiprows, version)
"""
modpath = True
compact = False
idx = 0
skiprows = 0
version = None
direction = None
with open(file_path, "r") as f:
while True:
line = f.readline()
if isinstance(line, bytes):
line = line.decode()
if skiprows < 1:
if f"MODPATH_{file_type.upper()}_FILE 6" in line.upper():
version = 6
elif (
f"MODPATH_{file_type.upper()}_FILE 7"
in line.upper()
):
version = 7
elif "MODPATH 5.0" in line.upper():
version = 5
if "COMPACT" in line.upper():
compact = True
elif "MODPATH Version 3.00" in line.upper():
version = 3
if version is None:
modpath = False
skiprows += 1
if version in [6, 7]:
if file_type.lower() == "endpoint":
if idx == 1:
direction = 1
if int(line.strip()[0]) == 2:
direction = -1
idx += 1
if "end header" in line.lower():
break
else:
break
return modpath, compact, skiprows, version, direction
[docs] def intersect(
self, cells, to_recarray
) -> Union[List[np.recarray], np.recarray]:
if self.version < 7:
try:
raslice = self._data[["k", "i", "j"]]
except (KeyError, ValueError):
raise KeyError(
"could not extract 'k', 'i', and 'j' keys "
"from {} data".format(self.output_type.lower())
)
else:
try:
raslice = self._data[["node"]]
except (KeyError, ValueError):
msg = "could not extract 'node' key from {} data".format(
self.output_type.lower()
)
raise KeyError(msg)
if isinstance(cells, (list, tuple)):
allint = all(isinstance(el, int) for el in cells)
# convert to a list of tuples
if allint:
t = []
for el in cells:
t.append((el,))
cells = t
cells = np.array(cells, dtype=raslice.dtype)
inds = np.in1d(raslice, cells)
epdest = self._data[inds].copy().view(np.recarray)
if to_recarray:
# use particle ids to get the rest of the paths
inds = np.in1d(self._data["particleid"], epdest.particleid)
series = self._data[inds].copy()
series.sort(order=["particleid", "time"])
series = series.view(np.recarray)
else:
# collect unique particleids in selection
partids = np.unique(epdest["particleid"])
series = [self.get_data(partid) for partid in partids]
return series
[docs]class PathlineFile(ModpathFile):
"""
Particle pathline file.
Parameters
----------
filename : str or PathLike
Path of the pathline file
verbose : bool
Show verbose output. Default is False.
Examples
--------
>>> import flopy
>>> pl_file = flopy.utils.PathlineFile('model.mppth')
>>> pl1 = pthobj.get_data(partid=1)
"""
dtypes = {
**dict.fromkeys(
[3, 5],
np.dtype(
[
("particleid", np.int32),
("x", np.float32),
("y", np.float32),
("zloc", np.float32),
("z", np.float32),
("time", np.float32),
("j", np.int32),
("i", np.int32),
("k", np.int32),
("cumulativetimestep", np.int32),
]
),
),
6: np.dtype(
[
("particleid", np.int32),
("particlegroup", np.int32),
("timepointindex", np.int32),
("cumulativetimestep", np.int32),
("time", np.float32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("k", np.int32),
("i", np.int32),
("j", np.int32),
("grid", np.int32),
("xloc", np.float32),
("yloc", np.float32),
("zloc", np.float32),
("linesegmentindex", np.int32),
]
),
7: np.dtype(
[
("particleid", np.int32),
("particlegroup", np.int32),
("sequencenumber", np.int32),
("particleidloc", np.int32),
("time", np.float32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("k", np.int32),
("node", np.int32),
("xloc", np.float32),
("yloc", np.float32),
("zloc", np.float32),
("stressperiod", np.int32),
("timestep", np.int32),
]
),
}
kijnames = [
"k",
"i",
"j",
"node",
"particleid",
"particlegroup",
"linesegmentindex",
"particleidloc",
"sequencenumber",
]
def __init__(
self, filename: Union[str, os.PathLike], verbose: bool = False
):
super().__init__(filename, verbose=verbose)
self.dtype, self._data = self._load()
self.nid = np.unique(self._data["particleid"])
def _load(self) -> Tuple[np.dtype, np.ndarray]:
dtype = self.dtypes[self.version]
if self.version == 7:
dtyper = np.dtype(
[
("node", np.int32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("time", np.float32),
("xloc", np.float32),
("yloc", np.float32),
("zloc", np.float32),
("k", np.int32),
("stressperiod", np.int32),
("timestep", np.int32),
]
)
idx = 0
particle_pathlines = {}
nrows = 0
with open(self.fname) as f:
while True:
if idx == 0:
for n in range(self.skiprows):
line = f.readline()
# read header line
try:
line = f.readline().strip()
if self.verbose:
print(line)
if len(line) < 1:
break
except:
break
t = [int(s) for j, s in enumerate(line.split()) if j < 4]
sequencenumber, group, particleid, pathlinecount = t[0:4]
nrows += pathlinecount
# read in the particle data
d = np.loadtxt(
itertools.islice(f, 0, pathlinecount), dtype=dtyper
)
key = (
idx,
sequencenumber,
group,
particleid,
pathlinecount,
)
particle_pathlines[key] = d.copy()
idx += 1
# create data array
data = np.zeros(nrows, dtype=dtype)
# fill data
ipos0 = 0
for key, value in particle_pathlines.items():
idx, sequencenumber, group, particleid, pathlinecount = key[
0:5
]
ipos1 = ipos0 + pathlinecount
# fill constant items for particle
# particleid is not necessarily unique for all pathlines - use
# sequencenumber which is unique
data["particleid"][ipos0:ipos1] = sequencenumber
# set particlegroup and sequence number
data["particlegroup"][ipos0:ipos1] = group
data["sequencenumber"][ipos0:ipos1] = sequencenumber
# save particleidloc to particleid
data["particleidloc"][ipos0:ipos1] = particleid
# fill particle data
for name in value.dtype.names:
data[name][ipos0:ipos1] = value[name]
ipos0 = ipos1
else:
data = loadtxt(self.fname, dtype=dtype, skiprows=self.skiprows)
# convert indices to zero-based
for n in self.kijnames:
if n in data.dtype.names:
data[n] -= 1
# sort by particle ID and time
data.sort(order=["particleid", "time"])
return dtype, data
[docs] def get_destination_pathline_data(self, dest_cells, to_recarray=False):
"""
Get pathline data that pass through a set of destination cells.
Parameters
----------
dest_cells : list or array of tuples
(k, i, j) of each destination cell for MODPATH versions less than
MODPATH 7 or node number of each destination cell. (zero based)
to_recarray : bool
Boolean that controls returned pthldest. If to_recarray is True,
a single recarray with all of the pathlines that intersect
dest_cells are returned. If to_recarray is False, a list of
recarrays (the same form as returned by get_alldata method)
that intersect dest_cells are returned (default is False).
Returns
-------
np.recarray
Slice of pathline data array (e.g. PathlineFile._data)
containing only pathlines that pass through (k,i,j) or (node)
dest_cells.
Examples
--------
>>> import flopy
>>> p = flopy.utils.PathlineFile('modpath.pathline')
>>> p0 = p.get_destination_pathline_data([(0, 0, 0),
... (1, 0, 0)])
"""
return super().get_destination_data(
dest_cells=dest_cells, to_recarray=to_recarray
)
[docs] def write_shapefile(
self,
data=None,
pathline_data=None,
one_per_particle=True,
direction="ending",
shpname="pathlines.shp",
mg=None,
crs=None,
**kwargs,
):
"""
Write pathlines to a shapefile.
Parameters
----------
data : np.recarray
Record array of same form as that returned by
.get_alldata() (if None, .get_alldata() is exported).
timeseries_data : np.recarray
Record array of same form as that returned by
.get_alldata() (if None, .get_alldata() is exported).
.. deprecated:: 3.7
The ``timeseries_data`` option will be removed for FloPy 4. Use ``data`` instead.
one_per_particle : boolean (default True)
True writes a single LineString with a single set of attribute
data for each particle. False writes a record/geometry for each
pathline segment (each row in the Timeseries file). This option can
be used to visualize attribute information (time, model layer,
etc.) across a pathline in a GIS.
direction : str
String defining if starting or ending particle locations should be
included in shapefile attribute information. Only used if
one_per_particle=False. (default is 'ending')
shpname : str
File path for shapefile
mg : flopy.discretization.grid instance
Used to scale and rotate Global x,y,z values in MODPATH Timeseries
file.
crs : pyproj.CRS, int, str, optional
Coordinate reference system (CRS) for the model grid
(must be projected; geographic CRS are not supported).
The value can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:26916") or a WKT string.
kwargs : keyword arguments to flopy.export.shapefile_utils.recarray2shp
.. deprecated:: 3.5
The following keyword options will be removed for FloPy 3.6:
- ``epsg`` (int): use ``crs`` instead.
"""
ParticleTrackFile.write_shapefile(
self,
data=data if data is not None else pathline_data,
one_per_particle=one_per_particle,
direction=direction,
shpname=shpname,
mg=mg,
crs=crs,
**kwargs,
)
[docs]class EndpointFile(ModpathFile):
"""
Particle endpoint file.
Parameters
----------
filename : str or PathLike
Path of the endpoint file
verbose : bool
Show verbose output. Default is False.
Examples
--------
>>> import flopy
>>> ep_file = flopy.utils.EndpointFile('model.mpend')
>>> ep1 = endobj.get_data(partid=1)
"""
dtypes = {
**dict.fromkeys(
[3, 5],
np.dtype(
[
("zone", np.int32),
("j", np.int32),
("i", np.int32),
("k", np.int32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("zloc", np.float32),
("time", np.float32),
("x0", np.float32),
("y0", np.float32),
("zloc0", np.float32),
("j0", np.int32),
("i0", np.int32),
("k0", np.int32),
("zone0", np.int32),
("cumulativetimestep", np.int32),
("ipcode", np.int32),
("time0", np.float32),
]
),
),
6: np.dtype(
[
("particleid", np.int32),
("particlegroup", np.int32),
("status", np.int32),
("time0", np.float32),
("time", np.float32),
("initialgrid", np.int32),
("k0", np.int32),
("i0", np.int32),
("j0", np.int32),
("cellface0", np.int32),
("zone0", np.int32),
("xloc0", np.float32),
("yloc0", np.float32),
("zloc0", np.float32),
("x0", np.float32),
("y0", np.float32),
("z0", np.float32),
("finalgrid", np.int32),
("k", np.int32),
("i", np.int32),
("j", np.int32),
("cellface", np.int32),
("zone", np.int32),
("xloc", np.float32),
("yloc", np.float32),
("zloc", np.float32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("label", "|S40"),
]
),
7: np.dtype(
[
("particleid", np.int32),
("particlegroup", np.int32),
("particleidloc", np.int32),
("status", np.int32),
("time0", np.float32),
("time", np.float32),
("node0", np.int32),
("k0", np.int32),
("xloc0", np.float32),
("yloc0", np.float32),
("zloc0", np.float32),
("x0", np.float32),
("y0", np.float32),
("z0", np.float32),
("zone0", np.int32),
("initialcellface", np.int32),
("node", np.int32),
("k", np.int32),
("xloc", np.float32),
("yloc", np.float32),
("zloc", np.float32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("zone", np.int32),
("cellface", np.int32),
]
),
}
kijnames = [
"k0",
"i0",
"j0",
"node0",
"k",
"i",
"j",
"node",
"particleid",
"particlegroup",
"particleidloc",
"zone0",
"zone",
]
def __init__(
self, filename: Union[str, os.PathLike], verbose: bool = False
):
super().__init__(filename, verbose)
self.dtype, self._data = self._load()
self.nid = np.unique(self._data["particleid"])
def _load(self) -> Tuple[np.dtype, np.ndarray]:
dtype = self.dtypes[self.version]
data = loadtxt(self.fname, dtype=dtype, skiprows=self.skiprows)
# convert indices to zero-based
for n in self.kijnames:
if n in data.dtype.names:
data[n] -= 1
# add particle ids for earlier version of MODPATH
if self.version < 6:
shape = data.shape[0]
pids = np.arange(1, shape + 1, 1, dtype=np.int32)
data = append_fields(data, "particleid", pids)
return dtype, data
[docs] def get_maxtraveltime(self):
"""
Get the maximum travel time.
Returns
----------
out : float
Maximum travel time.
"""
return (self._data["time"] - self._data["time0"]).max()
[docs] def get_alldata(self):
"""
Get endpoint data from the endpoint file for all endpoints.
Parameters
----------
Returns
----------
data : numpy record array
A numpy recarray with the endpoint particle data
See Also
--------
Notes
-----
Examples
--------
>>> import flopy
>>> endobj = flopy.utils.EndpointFile('model.mpend')
>>> e = endobj.get_alldata()
"""
return self._data.view(np.recarray).copy()
[docs] def get_destination_endpoint_data(self, dest_cells, source=False):
"""
Get endpoint data that terminate in a set of destination cells.
Parameters
----------
dest_cells : list or array of tuples
(k, i, j) of each destination cell for MODPATH versions less than
MODPATH 7 or node number of each destination cell. (zero based)
source : bool
Boolean to specify is dest_cells applies to source or
destination cells (default is False).
Returns
-------
np.recarray
Slice of endpoint data array (e.g. EndpointFile.get_alldata)
containing only endpoint data with final locations in (k,i,j) or
(node) dest_cells.
Examples
--------
>>> import flopy
>>> e = flopy.utils.EndpointFile('modpath.endpoint')
>>> e0 = e.get_destination_endpoint_data([(0, 0, 0),
... (1, 0, 0)])
"""
# create local copy of _data
data = self.get_alldata()
# find the intersection of endpoints and dest_cells
# convert dest_cells to same dtype for comparison
if self.version < 7:
if source:
keys = ["k0", "i0", "j0"]
else:
keys = ["k", "i", "j"]
try:
raslice = repack_fields(data[keys])
except (KeyError, ValueError):
raise KeyError(
"could not extract "
+ "', '".join(keys)
+ " from endpoint data."
)
else:
if source:
keys = ["node0"]
else:
keys = ["node"]
try:
raslice = repack_fields(data[keys])
except (KeyError, ValueError):
msg = f"could not extract '{keys[0]}' key from endpoint data"
raise KeyError(msg)
if isinstance(dest_cells, (list, tuple)):
allint = all(isinstance(el, int) for el in dest_cells)
# convert to a list of tuples
if allint:
t = []
for el in dest_cells:
t.append((el,))
dest_cells = t
dtype = []
for key in keys:
dtype.append((key, np.int32))
dtype = np.dtype(dtype)
dest_cells = np.array(dest_cells, dtype=dtype)
inds = np.in1d(raslice, dest_cells)
return data[inds].copy().view(np.recarray)
[docs] def write_shapefile(
self,
data=None,
endpoint_data=None,
shpname="endpoints.shp",
direction="ending",
mg=None,
crs=None,
**kwargs,
):
"""
Write particle starting / ending locations to shapefile.
data : np.recarray
Record array of same form as that returned by EndpointFile.get_alldata.
(if none, EndpointFile.get_alldata() is exported).
endpoint_data : np.recarray
Record array of same form as that returned by EndpointFile.get_alldata.
(if none, EndpointFile.get_alldata() is exported).
.. deprecated:: 3.7
The ``endpoint_data`` option will be removed for FloPy 4. Use ``data`` instead.
shpname : str
File path for shapefile
direction : str
String defining if starting or ending particle locations should be
considered. (default is 'ending')
mg : flopy.discretization.grid instance
Used to scale and rotate Global x,y,z values in MODPATH Endpoint
file.
crs : pyproj.CRS, int, str, optional
Coordinate reference system (CRS) for the model grid
(must be projected; geographic CRS are not supported).
The value can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:26916") or a WKT string.
kwargs : keyword arguments to flopy.export.shapefile_utils.recarray2shp
.. deprecated:: 3.5
The following keyword options will be removed for FloPy 3.6:
- ``epsg`` (int): use ``crs`` instead.
"""
from ..discretization import StructuredGrid
from ..export.shapefile_utils import recarray2shp
from ..utils import geometry
from ..utils.geometry import Point
epd = (data if data is not None else endpoint_data).copy()
if epd is None:
epd = self.get_alldata()
if direction.lower() == "ending":
xcol, ycol, zcol = "x", "y", "z"
elif direction.lower() == "starting":
xcol, ycol, zcol = "x0", "y0", "z0"
else:
raise Exception(
'flopy.map.plot_endpoint direction must be "ending" '
'or "starting".'
)
if mg is None:
raise ValueError("A modelgrid object was not provided.")
if isinstance(mg, StructuredGrid):
x, y = geometry.transform(
epd[xcol],
epd[ycol],
xoff=mg.xoffset,
yoff=mg.yoffset,
angrot_radians=mg.angrot_radians,
)
else:
x, y = mg.get_coords(epd[xcol], epd[ycol])
z = epd[zcol]
geoms = [Point(x[i], y[i], z[i]) for i in range(len(epd))]
# convert back to one-based
for n in self.kijnames:
if n in epd.dtype.names:
epd[n] += 1
recarray2shp(epd, geoms, shpname=shpname, crs=crs, **kwargs)
[docs]class TimeseriesFile(ModpathFile):
"""
Particle timeseries file.
Parameters
----------
filename : str or PathLike
Path of the timeseries file
verbose : bool
Show verbose output. Default is False.
Examples
--------
>>> import flopy
>>> ts_file = flopy.utils.TimeseriesFile('model.timeseries')
>>> ts1 = tsobj.get_data(partid=1)
"""
dtypes = {
**dict.fromkeys(
[3, 5],
np.dtype(
[
("timestepindex", np.int32),
("particleid", np.int32),
("node", np.int32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("zloc", np.float32),
("time", np.float32),
("timestep", np.int32),
]
),
),
6: np.dtype(
[
("timepointindex", np.int32),
("timestep", np.int32),
("time", np.float32),
("particleid", np.int32),
("particlegroup", np.int32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("grid", np.int32),
("k", np.int32),
("i", np.int32),
("j", np.int32),
("xloc", np.float32),
("yloc", np.float32),
("zloc", np.float32),
]
),
7: np.dtype(
[
("timepointindex", np.int32),
("timestep", np.int32),
("time", np.float32),
("particleid", np.int32),
("particlegroup", np.int32),
("particleidloc", np.int32),
("node", np.int32),
("xloc", np.float32),
("yloc", np.float32),
("zloc", np.float32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("k", np.int32),
]
),
}
kijnames = [
"k",
"i",
"j",
"node",
"particleid",
"particlegroup",
"particleidloc",
"timestep",
"timestepindex",
"timepointindex",
]
def __init__(self, filename, verbose=False):
super().__init__(filename, verbose)
self.dtype, self._data = self._load()
self.nid = np.unique(self._data["particleid"])
def _load(self) -> Tuple[np.dtype, np.ndarray]:
dtype = self.dtypes[self.version]
if self.version in [3, 5] and not self.compact:
dtype = np.dtype(
[
("timestepindex", np.int32),
("particleid", np.int32),
("j", np.int32),
("i", np.int32),
("k", np.int32),
("x", np.float32),
("y", np.float32),
("z", np.float32),
("zloc", np.float32),
("time", np.float32),
("timestep", np.int32),
]
)
data = loadtxt(self.fname, dtype=dtype, skiprows=self.skiprows)
# convert indices to zero-based
for n in self.kijnames:
if n in data.dtype.names:
data[n] -= 1
# sort by particle ID and time
data.sort(order=["particleid", "time"])
return dtype, data
[docs] def get_destination_timeseries_data(self, dest_cells):
"""
Get timeseries data that pass through a set of destination cells.
Parameters
----------
dest_cells : list or array of tuples
(k, i, j) of each destination cell for MODPATH versions less than
MODPATH 7 or node number of each destination cell. (zero based)
Returns
-------
np.recarray
Slice of timeseries data array (e.g. TmeseriesFile._data)
containing only timeseries that pass through (k,i,j) or
(node) dest_cells.
Examples
--------
>>> import flopy
>>> ts = flopy.utils.TimeseriesFile('modpath.timeseries')
>>> tss = ts.get_destination_timeseries_data([(0, 0, 0),
... (1, 0, 0)])
"""
return super().get_destination_data(dest_cells=dest_cells)
[docs] def write_shapefile(
self,
data=None,
timeseries_data=None,
one_per_particle=True,
direction="ending",
shpname="pathlines.shp",
mg=None,
crs=None,
**kwargs,
):
"""
Write timeseries to a shapefile
data : np.recarray
Record array of same form as that returned by
Timeseries.get_alldata(). (if none, Timeseries.get_alldata()
is exported).
timeseries_data : np.recarray
Record array of same form as that returned by
Timeseries.get_alldata(). (if none, Timeseries.get_alldata()
is exported).
.. deprecated:: 3.7
The ``timeseries_data`` option will be removed for FloPy 4. Use ``data`` instead.
one_per_particle : boolean (default True)
True writes a single LineString with a single set of attribute
data for each particle. False writes a record/geometry for each
pathline segment (each row in the Timeseries file). This option can
be used to visualize attribute information (time, model layer,
etc.) across a pathline in a GIS.
direction : str
String defining if starting or ending particle locations should be
included in shapefile attribute information. Only used if
one_per_particle=False. (default is 'ending')
shpname : str
File path for shapefile
mg : flopy.discretization.grid instance
Used to scale and rotate Global x,y,z values in MODPATH Timeseries
file.
crs : pyproj.CRS, int, str, optional
Coordinate reference system (CRS) for the model grid
(must be projected; geographic CRS are not supported).
The value can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:26916") or a WKT string.
kwargs : keyword arguments to flopy.export.shapefile_utils.recarray2shp
.. deprecated:: 3.5
The following keyword options will be removed for FloPy 3.6:
- ``epsg`` (int): use ``crs`` instead.
"""
ParticleTrackFile.write_shapefile(
self,
data=data if data is not None else timeseries_data,
one_per_particle=one_per_particle,
direction=direction,
shpname=shpname,
mg=mg,
crs=crs,
**kwargs,
)