Source code for flopy.utils.modpathfile

"""
Module to read MODPATH output files.  The module contains two
important classes that can be accessed by the user.

*  EndpointFile (ascii endpoint file)
*  PathlineFile (ascii pathline file)

"""

import itertools

import numpy as np
from numpy.lib.recfunctions import append_fields, stack_arrays

from ..utils.flopy_io import loadtxt
from ..utils.recarray_utils import ra_slice


class _ModpathSeries(object):
    """
    Base class for PathlineFile and TimeseriesFile objects.

    This class served to reduce the amount of duplicated code and
    increase maintainability of the modpath output methods

    Parameters
    ----------
    filename : str
        name of pathline or modpath file
    verbose : bool
        Write information to the screen. Default is False
    output_type : str
        pathline or timeseries file type

    """

    def __init__(self, filename, verbose=False, output_type="pathline"):
        self.fname = filename
        self.verbose = verbose
        self.output_type = output_type.upper()

        self._build_index()

        # set output type
        self.outdtype = self._get_outdtype()

    def _build_index(self):
        """
        Set position of the start of the pathline data.
        """
        compact = False
        self.skiprows = 0
        self.file = open(self.fname, "r")
        while True:
            line = self.file.readline()
            if isinstance(line, bytes):
                line = line.decode()
            if self.skiprows < 1:
                if f"MODPATH_{self.output_type}_FILE 6" in line.upper():
                    self.version = 6
                elif (
                    f"MODPATH_{self.output_type}_FILE         7"
                    in line.upper()
                ):
                    self.version = 7
                elif "MODPATH 5.0" in line.upper():
                    self.version = 5
                    if "COMPACT" in line.upper():
                        compact = True
                elif "MODPATH Version 3.00" in line.upper():
                    self.version = 3
                else:
                    self.version = None
                if self.version is None:
                    errmsg = "{} is not a valid {} file".format(
                        self.fname, self.output_type.lower()
                    )
                    raise Exception(errmsg)
            self.skiprows += 1
            if self.version == 6 or self.version == 7:
                if "end header" in line.lower():
                    break
            elif self.version == 3 or self.version == 5:
                break

        # set compact
        self.compact = compact

        # return to start of file
        self.file.seek(0)

    def _get_outdtype(self):
        outdtype = np.dtype(
            [
                ("x", np.float32),
                ("y", np.float32),
                ("z", np.float32),
                ("time", np.float32),
                ("k", np.int32),
                ("particleid", np.int32),
            ]
        )
        return outdtype

    def get_maxid(self):
        """
        Get the maximum timeseries number in the file timeseries file

        Returns
        ----------
        out : int
            Maximum pathline number.

        """
        return self._data["particleid"].max()

    def get_maxtime(self):
        """
        Get the maximum time in pathline file

        Returns
        ----------
        out : float
            Maximum pathline time.

        """
        return self._data["time"].max()

    def get_data(self, partid=0, totim=None, ge=True):
        """
        get pathline data from the pathline file for a single pathline.

        Parameters
        ----------
        partid : int
            The zero-based particle id
        totim : float
            The simulation time.
        ge : bool
            Boolean that determines if pathline times greater than or equal
            to or less than or equal to totim.

        Returns
        ----------
        ra : np.recarray
            Recarray with the x, y, z, time, k, and particleid.

        """
        ra = self._data
        ra.sort(order=["particleid", "time"])
        if totim is not None:
            if ge:
                idx = np.where(
                    (ra["time"] >= totim) & (ra["particleid"] == partid)
                )[0]
            else:
                idx = np.where(
                    (ra["time"] <= totim) & (ra["particleid"] == partid)
                )[0]
        else:
            idx = np.where(ra["particleid"] == partid)[0]
        ra = ra[idx]
        return ra[["x", "y", "z", "time", "k", "particleid"]]

    def get_alldata(self, totim=None, ge=True):
        """
        get data from the output file for all particles and all times.

        Parameters
        ----------
        totim : float
            The simulation time.
        ge : bool
            Boolean that determines if pathline times greater than or equal
            to or less than or equal to totim.

        Returns
        ----------
        plist : list of numpy record arrays
            A list of numpy recarrays

        """
        ra = self._data
        ra.sort(order=["particleid", "time"])
        if totim is not None:
            if ge:
                idx = np.where(ra["time"] >= totim)[0]
            else:
                idx = np.where(ra["time"] <= totim)[0]
            if len(idx) > 0:
                ra = ra[idx]
        ra = ra[["x", "y", "z", "time", "k", "particleid"]]
        return [ra[ra["particleid"] == i] for i in range(self.nid.size)]

    def get_destination_data(self, dest_cells, to_recarray=True):
        """
        Get data for set of destination cells.

        Parameters
        ----------
        dest_cells : list or array of tuples
            (k, i, j) of each destination cell for MODPATH versions less than
            MODPATH 7 or node number of each destination cell. (zero based)
        to_recarray : bool
            Boolean that controls returned series. If to_recarray is True,
            a single recarray with all of the pathlines that intersect
            dest_cells are returned. If to_recarray is False, a list of
            recarrays (the same form as returned by get_alldata method)
            that intersect dest_cells are returned (default is False).

        Returns
        -------
        series : np.recarray
            Slice of data array (e.g. PathlineFile._data, TimeseriesFile._data)
            containing endpoint, pathline, or timeseries data that intersect
            (k,i,j) or (node) dest_cells.

        """

        # create local copy of _data
        ra = np.array(self._data)

        # find the intersection of pathlines and dest_cells
        # convert dest_cells to same dtype for comparison
        if self.version < 7:
            try:
                raslice = ra[["k", "i", "j"]]
            except (KeyError, ValueError):
                raise KeyError(
                    "could not extract 'k', 'i', and 'j' keys "
                    "from {} data".format(self.output_type.lower())
                )
        else:
            try:
                raslice = ra[["node"]]
            except (KeyError, ValueError):
                msg = "could not extract 'node' key from {} data".format(
                    self.output_type.lower()
                )
                raise KeyError(msg)
            if isinstance(dest_cells, (list, tuple)):
                allint = all(isinstance(el, int) for el in dest_cells)
                # convert to a list of tuples
                if allint:
                    t = []
                    for el in dest_cells:
                        t.append((el,))
                        dest_cells = t

        dest_cells = np.array(dest_cells, dtype=raslice.dtype)
        inds = np.in1d(raslice, dest_cells)
        epdest = ra[inds].copy().view(np.recarray)

        if to_recarray:
            # use particle ids to get the rest of the paths
            inds = np.in1d(ra["particleid"], epdest.particleid)
            series = ra[inds].copy()
            series.sort(order=["particleid", "time"])
            series = series.view(np.recarray)
        else:

            # get list of unique particleids in selection
            partids = np.unique(epdest["particleid"])

            # build list of unique particleids in selection
            series = [self.get_data(partid) for partid in partids]

        return series

    def write_shapefile(
        self,
        data=None,
        one_per_particle=True,
        direction="ending",
        shpname="endpoints.shp",
        mg=None,
        epsg=None,
        **kwargs,
    ):
        """
        Write pathlines or timeseries to a shapefile

        data : np.recarray
            Record array of same form as that returned by
            get_alldata(). (if none, get_alldata() is exported).
        one_per_particle : boolean (default True)
            True writes a single LineString with a single set of attribute
            data for each particle. False writes a record/geometry for each
            pathline segment (each row in the PathLine file). This option can
            be used to visualize attribute information (time, model layer,
            etc.) across a pathline in a GIS.
        direction : str
            String defining if starting or ending particle locations should be
            included in shapefile attribute information. Only used if
            one_per_particle=False. (default is 'ending')
        shpname : str
            File path for shapefile
        mg : flopy.discretization.grid instance
            Used to scale and rotate Global x,y,z values.
        epsg : int
            EPSG code for writing projection (.prj) file. If this is not
            supplied, the proj4 string or epgs code associated with mg will be
            used.
        kwargs : keyword arguments to flopy.export.shapefile_utils.recarray2shp

        """
        from ..discretization import StructuredGrid
        from ..export.shapefile_utils import recarray2shp
        from ..utils import geometry
        from ..utils.geometry import LineString

        series = data
        if series is None:
            series = self._data.view(np.recarray)
        else:
            # convert pathline list to a single recarray
            if isinstance(series, list):
                s = series[0]
                print(s.dtype)
                for n in range(1, len(series)):
                    s = stack_arrays((s, series[n]))
                series = s.view(np.recarray)

        series = series.copy()
        series.sort(order=["particleid", "time"])

        if mg is None:
            raise ValueError("A modelgrid object was not provided.")

        if epsg is None:
            epsg = mg.epsg

        particles = np.unique(series.particleid)
        geoms = []

        # create dtype with select attributes in pth
        names = series.dtype.names
        dtype = []
        atts = ["particleid", "particlegroup", "time", "k", "i", "j", "node"]
        for att in atts:
            if att in names:
                t = np.int32
                if att == "time":
                    t = np.float32
                dtype.append((att, t))
        dtype = np.dtype(dtype)

        # reset names to the selected names in the created dtype
        names = dtype.names

        # 1 geometry for each path
        if one_per_particle:

            loc_inds = 0
            if direction == "ending":
                loc_inds = -1

            sdata = []
            for pid in particles:
                ra = series[series.particleid == pid]

                x, y = geometry.transform(
                    ra.x, ra.y, mg.xoffset, mg.yoffset, mg.angrot_radians
                )
                z = ra.z
                geoms.append(LineString(list(zip(x, y, z))))

                t = [pid]
                if "particlegroup" in names:
                    t.append(ra.particlegroup[0])
                t.append(ra.time.max())
                if "node" in names:
                    t.append(ra.node[loc_inds])
                else:
                    if "k" in names:
                        t.append(ra.k[loc_inds])
                    if "i" in names:
                        t.append(ra.i[loc_inds])
                    if "j" in names:
                        t.append(ra.j[loc_inds])
                sdata.append(tuple(t))

            sdata = np.array(sdata, dtype=dtype).view(np.recarray)

        # geometry for each row in PathLine file
        else:
            dtype = series.dtype
            sdata = []
            for pid in particles:
                ra = series[series.particleid == pid]
                if isinstance(mg, StructuredGrid):
                    x, y = geometry.transform(
                        ra.x, ra.y, mg.xoffset, mg.yoffset, mg.angrot_radians
                    )
                else:
                    x, y = mg.transform(ra.x, ra.y)
                z = ra.z
                geoms += [
                    LineString(
                        [(x[i - 1], y[i - 1], z[i - 1]), (x[i], y[i], z[i])]
                    )
                    for i in np.arange(1, (len(ra)))
                ]
                sdata += ra[1:].tolist()
            sdata = np.array(sdata, dtype=dtype).view(np.recarray)

        # convert back to one-based
        for n in set(self.kijnames).intersection(set(sdata.dtype.names)):
            sdata[n] += 1

        # write the final recarray to a shapefile
        recarray2shp(sdata, geoms, shpname=shpname, epsg=epsg, **kwargs)


[docs]class PathlineFile(_ModpathSeries): """ PathlineFile Class. Parameters ---------- filename : string Name of the pathline file verbose : bool Write information to the screen. Default is False. Examples -------- >>> import flopy >>> pthobj = flopy.utils.PathlineFile('model.mppth') >>> p1 = pthobj.get_data(partid=1) """ kijnames = [ "k", "i", "j", "node", "particleid", "particlegroup", "linesegmentindex", "particleidloc", "sequencenumber", ] def __init__(self, filename, verbose=False): """ Class constructor. """ super().__init__(filename, verbose=verbose, output_type="pathline") # set data dtype and read pathline data if self.version == 7: self.dtype, self._data = self._get_mp7data() else: self.dtype = self._get_dtypes() self._data = loadtxt( self.file, dtype=self.dtype, skiprows=self.skiprows ) # convert layer, row, and column indices; particle id and group; and # line segment indices to zero-based for n in self.kijnames: if n in self._data.dtype.names: self._data[n] -= 1 # set number of particle ids self.nid = np.unique(self._data["particleid"]) # close the input file self.file.close() def _get_dtypes(self): """ Build numpy dtype for the MODPATH 6 pathline file. """ if self.version == 3 or self.version == 5: dtype = np.dtype( [ ("particleid", np.int32), ("x", np.float32), ("y", np.float32), ("zloc", np.float32), ("z", np.float32), ("time", np.float32), ("j", np.int32), ("i", np.int32), ("k", np.int32), ("cumulativetimestep", np.int32), ] ) elif self.version == 6: dtype = np.dtype( [ ("particleid", np.int32), ("particlegroup", np.int32), ("timepointindex", np.int32), ("cumulativetimestep", np.int32), ("time", np.float32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("k", np.int32), ("i", np.int32), ("j", np.int32), ("grid", np.int32), ("xloc", np.float32), ("yloc", np.float32), ("zloc", np.float32), ("linesegmentindex", np.int32), ] ) elif self.version == 7: raise TypeError( "_get_dtypes() should not be called for " "MODPATH 7 pathline files" ) return dtype def _get_mp7data(self): dtyper = np.dtype( [ ("node", np.int32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("time", np.float32), ("xloc", np.float32), ("yloc", np.float32), ("zloc", np.float32), ("k", np.int32), ("stressperiod", np.int32), ("timestep", np.int32), ] ) dtype = np.dtype( [ ("particleid", np.int32), ("particlegroup", np.int32), ("sequencenumber", np.int32), ("particleidloc", np.int32), ("time", np.float32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("k", np.int32), ("node", np.int32), ("xloc", np.float32), ("yloc", np.float32), ("zloc", np.float32), ("stressperiod", np.int32), ("timestep", np.int32), ] ) idx = 0 part_dict = {} ndata = 0 while True: if idx == 0: for n in range(self.skiprows): line = self.file.readline() # read header line try: line = self.file.readline().strip() if self.verbose: print(line) if len(line) < 1: break except: break t = [int(s) for j, s in enumerate(line.split()) if j < 4] sequencenumber, group, particleid, pathlinecount = t[0:4] ndata += pathlinecount # read in the particle data d = np.loadtxt( itertools.islice(self.file, 0, pathlinecount), dtype=dtyper ) key = (idx, sequencenumber, group, particleid, pathlinecount) part_dict[key] = d.copy() idx += 1 # create data array data = np.zeros(ndata, dtype=dtype) # fill data ipos0 = 0 for key, value in part_dict.items(): idx, sequencenumber, group, particleid, pathlinecount = key[0:5] ipos1 = ipos0 + pathlinecount # fill constant items for particle # particleid is not necessarily unique for all pathlines - use # sequencenumber which is unique data["particleid"][ipos0:ipos1] = sequencenumber # set particlegroup and sequence number data["particlegroup"][ipos0:ipos1] = group data["sequencenumber"][ipos0:ipos1] = sequencenumber # save particleidloc to particleid data["particleidloc"][ipos0:ipos1] = particleid # fill particle data for name in value.dtype.names: data[name][ipos0:ipos1] = value[name] ipos0 = ipos1 return dtype, data
[docs] def get_maxid(self): """ Get the maximum pathline number in the file pathline file Returns ---------- out : int Maximum pathline number. """ return super().get_maxid()
[docs] def get_maxtime(self): """ Get the maximum time in pathline file Returns ---------- out : float Maximum pathline time. """ return super().get_maxtime()
[docs] def get_data(self, partid=0, totim=None, ge=True): """ get pathline data from the pathline file for a single pathline. Parameters ---------- partid : int The zero-based particle id. The first record is record 0. totim : float The simulation time. All pathline points for particle partid that are greater than or equal to (ge=True) or less than or equal to (ge=False) totim will be returned. Default is None ge : bool Boolean that determines if pathline times greater than or equal to or less than or equal to totim is used to create a subset of pathlines. Default is True. Returns ---------- ra : numpy record array A numpy recarray with the x, y, z, time, k, and particleid for pathline partid. See Also -------- Notes ----- Examples -------- >>> import flopy.utils.modpathfile as mpf >>> pthobj = flopy.utils.PathlineFile('model.mppth') >>> p1 = pthobj.get_data(partid=1) """ return super().get_data(partid=partid, totim=totim, ge=ge)
[docs] def get_alldata(self, totim=None, ge=True): """ get pathline data from the pathline file for all pathlines and all times. Parameters ---------- totim : float The simulation time. All pathline points for particle partid that are greater than or equal to (ge=True) or less than or equal to (ge=False) totim will be returned. Default is None ge : bool Boolean that determines if pathline times greater than or equal to or less than or equal to totim is used to create a subset of pathlines. Default is True. Returns ---------- plist : a list of numpy record array A list of numpy recarrays with the x, y, z, time, k, and particleid for all pathlines. Examples -------- >>> import flopy.utils.modpathfile as mpf >>> pthobj = flopy.utils.PathlineFile('model.mppth') >>> p = pthobj.get_alldata() """ return super().get_alldata(totim=totim, ge=ge)
[docs] def get_destination_pathline_data(self, dest_cells, to_recarray=False): """ Get pathline data that pass through a set of destination cells. Parameters ---------- dest_cells : list or array of tuples (k, i, j) of each destination cell for MODPATH versions less than MODPATH 7 or node number of each destination cell. (zero based) to_recarray : bool Boolean that controls returned pthldest. If to_recarray is True, a single recarray with all of the pathlines that intersect dest_cells are returned. If to_recarray is False, a list of recarrays (the same form as returned by get_alldata method) that intersect dest_cells are returned (default is False). Returns ------- pthldest : np.recarray Slice of pathline data array (e.g. PathlineFile._data) containing only pathlines that pass through (k,i,j) or (node) dest_cells. Examples -------- >>> import flopy >>> p = flopy.utils.PathlineFile('modpath.pathline') >>> p0 = p.get_destination_pathline_data([(0, 0, 0), ... (1, 0, 0)]) """ return super().get_destination_data( dest_cells=dest_cells, to_recarray=to_recarray )
[docs] def write_shapefile( self, pathline_data=None, one_per_particle=True, direction="ending", shpname="pathlines.shp", mg=None, epsg=None, **kwargs, ): """ Write pathlines to a shapefile pathline_data : np.recarray Record array of same form as that returned by PathlineFile.get_alldata(). (if none, PathlineFile.get_alldata() is exported). one_per_particle : boolean (default True) True writes a single LineString with a single set of attribute data for each particle. False writes a record/geometry for each pathline segment (each row in the PathLine file). This option can be used to visualize attribute information (time, model layer, etc.) across a pathline in a GIS. direction : str String defining if starting or ending particle locations should be included in shapefile attribute information. Only used if one_per_particle=False. (default is 'ending') shpname : str File path for shapefile mg : flopy.discretization.grid instance Used to scale and rotate Global x,y,z values in MODPATH Pathline file. epsg : int EPSG code for writing projection (.prj) file. If this is not supplied, the proj4 string or epgs code associated with mg will be used. kwargs : keyword arguments to flopy.export.shapefile_utils.recarray2shp """ super().write_shapefile( data=pathline_data, one_per_particle=one_per_particle, direction=direction, shpname=shpname, mg=mg, epsg=epsg, **kwargs, )
[docs]class EndpointFile: """ EndpointFile Class. Parameters ---------- filename : string Name of the endpoint file verbose : bool Write information to the screen. Default is False. Examples -------- >>> import flopy >>> endobj = flopy.utils.EndpointFile('model.mpend') >>> e1 = endobj.get_data(partid=1) """ kijnames = [ "k0", "i0", "j0", "node0", "k", "i", "j", "node", "particleid", "particlegroup", "particleidloc", "zone0", "zone", ] def __init__(self, filename, verbose=False): """ Class constructor. """ self.fname = filename self.verbose = verbose self._build_index() self.dtype = self._get_dtypes() self._data = loadtxt( self.file, dtype=self.dtype, skiprows=self.skiprows ) # add particleid if required self._add_particleid() # convert layer, row, and column indices; particle id and group; and # line segment indices to zero-based for n in self.kijnames: if n in self._data.dtype.names: self._data[n] -= 1 # set number of particle ids self.nid = np.unique(self._data["particleid"]).shape[0] # close the input file self.file.close() return def _build_index(self): """ Set position of the start of the pathline data. """ self.skiprows = 0 self.file = open(self.fname, "r") idx = 0 while True: line = self.file.readline() if isinstance(line, bytes): line = line.decode() if self.skiprows < 1: if "MODPATH_ENDPOINT_FILE 6" in line.upper(): self.version = 6 elif "MODPATH_ENDPOINT_FILE 7" in line.upper(): self.version = 7 elif "MODPATH 5.0" in line.upper(): self.version = 5 elif "MODPATH Version 3.00" in line.upper(): self.version = 3 else: self.version = None if self.version is None: errmsg = f"{self.fname} is not a valid endpoint file" raise Exception(errmsg) self.skiprows += 1 if self.version == 6 or self.version == 7: if idx == 1: t = line.strip() self.direction = 1 if int(t[0]) == 2: self.direction = -1 idx += 1 if "end header" in line.lower(): break else: break self.file.seek(0) if self.verbose: print(f"MODPATH version {self.version} endpoint file") def _get_dtypes(self): """ Build numpy dtype for the MODPATH 6 endpoint file. """ if self.version == 3 or self.version == 5: dtype = self._get_mp35_dtype() elif self.version == 6: dtype = self._get_mp6_dtype() elif self.version == 7: dtype = self._get_mp7_dtype() return dtype def _get_mp35_dtype(self, add_id=False): dtype = [ ("zone", np.int32), ("j", np.int32), ("i", np.int32), ("k", np.int32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("zloc", np.float32), ("time", np.float32), ("x0", np.float32), ("y0", np.float32), ("zloc0", np.float32), ("j0", np.int32), ("i0", np.int32), ("k0", np.int32), ("zone0", np.int32), ("cumulativetimestep", np.int32), ("ipcode", np.int32), ("time0", np.float32), ] if add_id: dtype.insert(0, ("particleid", np.int32)) return np.dtype(dtype) def _get_mp6_dtype(self): dtype = [ ("particleid", np.int32), ("particlegroup", np.int32), ("status", np.int32), ("time0", np.float32), ("time", np.float32), ("initialgrid", np.int32), ("k0", np.int32), ("i0", np.int32), ("j0", np.int32), ("cellface0", np.int32), ("zone0", np.int32), ("xloc0", np.float32), ("yloc0", np.float32), ("zloc0", np.float32), ("x0", np.float32), ("y0", np.float32), ("z0", np.float32), ("finalgrid", np.int32), ("k", np.int32), ("i", np.int32), ("j", np.int32), ("cellface", np.int32), ("zone", np.int32), ("xloc", np.float32), ("yloc", np.float32), ("zloc", np.float32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("label", "|S40"), ] return np.dtype(dtype) def _get_mp7_dtype(self): dtype = [ ("particleid", np.int32), ("particlegroup", np.int32), ("particleidloc", np.int32), ("status", np.int32), ("time0", np.float32), ("time", np.float32), ("node0", np.int32), ("k0", np.int32), ("xloc0", np.float32), ("yloc0", np.float32), ("zloc0", np.float32), ("x0", np.float32), ("y0", np.float32), ("z0", np.float32), ("zone0", np.int32), ("initialcellface", np.int32), ("node", np.int32), ("k", np.int32), ("xloc", np.float32), ("yloc", np.float32), ("zloc", np.float32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("zone", np.int32), ("cellface", np.int32), ] return np.dtype(dtype) def _add_particleid(self): # add particle ids for earlier version of MODPATH if self.version < 6: # create particle ids shaped = self._data.shape[0] pids = np.arange(1, shaped + 1, 1, dtype=np.int32) # for numpy version 1.14 and higher self._data = append_fields(self._data, "particleid", pids) return
[docs] def get_maxid(self): """ Get the maximum endpoint particle id in the file endpoint file Returns ---------- out : int Maximum endpoint particle id. """ return np.unique(self._data["particleid"]).max()
[docs] def get_maxtime(self): """ Get the maximum time in the endpoint file Returns ---------- out : float Maximum endpoint time. """ return self._data["time"].max()
[docs] def get_maxtraveltime(self): """ Get the maximum travel time in the endpoint file Returns ---------- out : float Maximum endpoint travel time. """ return (self._data["time"] - self._data["time0"]).max()
[docs] def get_data(self, partid=0): """ Get endpoint data from the endpoint file for a single particle. Parameters ---------- partid : int The zero-based particle id. The first record is record 0. (default is 0) Returns ---------- ra : numpy record array A numpy recarray with the endpoint particle data for endpoint partid. See Also -------- Notes ----- Examples -------- >>> import flopy >>> endobj = flopy.utils.EndpointFile('model.mpend') >>> e1 = endobj.get_data(partid=1) """ idx = self._data["particleid"] == partid ra = self._data[idx] return ra
[docs] def get_alldata(self): """ Get endpoint data from the endpoint file for all endpoints. Parameters ---------- Returns ---------- ra : numpy record array A numpy recarray with the endpoint particle data See Also -------- Notes ----- Examples -------- >>> import flopy >>> endobj = flopy.utils.EndpointFile('model.mpend') >>> e = endobj.get_alldata() """ return self._data.view(np.recarray).copy()
[docs] def get_destination_endpoint_data(self, dest_cells, source=False): """ Get endpoint data that terminate in a set of destination cells. Parameters ---------- dest_cells : list or array of tuples (k, i, j) of each destination cell for MODPATH versions less than MODPATH 7 or node number of each destination cell. (zero based) source : bool Boolean to specify is dest_cells applies to source or destination cells (default is False). Returns ------- epdest : np.recarray Slice of endpoint data array (e.g. EndpointFile.get_alldata) containing only endpoint data with final locations in (k,i,j) or (node) dest_cells. Examples -------- >>> import flopy >>> e = flopy.utils.EndpointFile('modpath.endpoint') >>> e0 = e.get_destination_endpoint_data([(0, 0, 0), ... (1, 0, 0)]) """ # create local copy of _data ra = self.get_alldata() # find the intersection of endpoints and dest_cells # convert dest_cells to same dtype for comparison if self.version < 7: if source: keys = ["k0", "i0", "j0"] else: keys = ["k", "i", "j"] try: raslice = ra_slice(ra, keys) except (KeyError, ValueError): raise KeyError( "could not extract " + "', '".join(keys) + " from endpoint data." ) else: if source: keys = ["node0"] else: keys = ["node"] try: raslice = ra_slice(ra, keys) except (KeyError, ValueError): msg = f"could not extract '{keys[0]}' key from endpoint data" raise KeyError(msg) if isinstance(dest_cells, (list, tuple)): allint = all(isinstance(el, int) for el in dest_cells) # convert to a list of tuples if allint: t = [] for el in dest_cells: t.append((el,)) dest_cells = t dtype = [] for key in keys: dtype.append((key, np.int32)) dtype = np.dtype(dtype) dest_cells = np.array(dest_cells, dtype=dtype) inds = np.in1d(raslice, dest_cells) epdest = ra[inds].copy().view(np.recarray) return epdest
[docs] def write_shapefile( self, endpoint_data=None, shpname="endpoints.shp", direction="ending", mg=None, epsg=None, **kwargs, ): """ Write particle starting / ending locations to shapefile. endpoint_data : np.recarray Record array of same form as that returned by EndpointFile.get_alldata. (if none, EndpointFile.get_alldata() is exported). shpname : str File path for shapefile direction : str String defining if starting or ending particle locations should be considered. (default is 'ending') mg : flopy.discretization.grid instance Used to scale and rotate Global x,y,z values in MODPATH Endpoint file. epsg : int EPSG code for writing projection (.prj) file. If this is not supplied, the proj4 string or epgs code associated with mg will be used. kwargs : keyword arguments to flopy.export.shapefile_utils.recarray2shp """ from ..discretization import StructuredGrid from ..export.shapefile_utils import recarray2shp from ..utils import geometry from ..utils.geometry import Point epd = endpoint_data.copy() if epd is None: epd = self.get_alldata() if direction.lower() == "ending": xcol, ycol, zcol = "x", "y", "z" elif direction.lower() == "starting": xcol, ycol, zcol = "x0", "y0", "z0" else: raise Exception( 'flopy.map.plot_endpoint direction must be "ending" ' 'or "starting".' ) if mg is None: raise ValueError("A modelgrid object was not provided.") if epsg is None: epsg = mg.epsg if isinstance(mg, StructuredGrid): x, y = geometry.transform( epd[xcol], epd[ycol], xoff=mg.xoffset, yoff=mg.yoffset, angrot_radians=mg.angrot_radians, ) else: x, y = mg.get_coords(epd[xcol], epd[ycol]) z = epd[zcol] geoms = [Point(x[i], y[i], z[i]) for i in range(len(epd))] # convert back to one-based for n in self.kijnames: if n in epd.dtype.names: epd[n] += 1 recarray2shp(epd, geoms, shpname=shpname, epsg=epsg, **kwargs)
[docs]class TimeseriesFile(_ModpathSeries): """ TimeseriesFile Class. Parameters ---------- filename : string Name of the timeseries file verbose : bool Write information to the screen. Default is False. Examples -------- >>> import flopy >>> tsobj = flopy.utils.TimeseriesFile('model.timeseries') >>> ts1 = tsobj.get_data(partid=1) """ kijnames = [ "k", "i", "j", "node", "particleid", "particlegroup", "particleidloc", "timestep", "timestepindex", "timepointindex", ] def __init__(self, filename, verbose=False): """ Class constructor. """ super().__init__(filename, verbose=verbose, output_type="timeseries") # set dtype self.dtype = self._get_dtypes() # read data self._data = loadtxt( self.file, dtype=self.dtype, skiprows=self.skiprows ) # convert layer, row, and column indices; particle id and group; and # line segment indices to zero-based for n in self.kijnames: if n in self._data.dtype.names: self._data[n] -= 1 # set number of particle ids self.nid = np.unique(self._data["particleid"]) # close the input file self.file.close() return def _build_index(self): """ Set position of the start of the timeseries data. """ compact = False self.skiprows = 0 self.file = open(self.fname, "r") while True: line = self.file.readline() if isinstance(line, bytes): line = line.decode() if self.skiprows < 1: if "MODPATH_TIMESERIES_FILE 6" in line.upper(): self.version = 6 elif "MODPATH_TIMESERIES_FILE 7" in line.upper(): self.version = 7 elif "MODPATH 5.0" in line.upper(): self.version = 5 if "COMPACT" in line.upper(): compact = True elif "MODPATH Version 3.00" in line.upper(): self.version = 3 else: self.version = None if self.version is None: raise Exception( f"{self.fname} is not a valid timeseries file" ) self.skiprows += 1 if self.version == 6 or self.version == 7: if "end header" in line.lower(): break elif self.version == 3 or self.version == 5: break # set compact self.compact = compact # return to the top of the file self.file.seek(0) def _get_dtypes(self): """ Build numpy dtype for the MODPATH 6 timeseries file. """ if self.version == 3 or self.version == 5: if self.compact: dtype = np.dtype( [ ("timestepindex", np.int32), ("particleid", np.int32), ("node", np.int32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("zloc", np.float32), ("time", np.float32), ("timestep", np.int32), ] ) else: dtype = np.dtype( [ ("timestepindex", np.int32), ("particleid", np.int32), ("j", np.int32), ("i", np.int32), ("k", np.int32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("zloc", np.float32), ("time", np.float32), ("timestep", np.int32), ] ) elif self.version == 6: dtype = np.dtype( [ ("timepointindex", np.int32), ("timestep", np.int32), ("time", np.float32), ("particleid", np.int32), ("particlegroup", np.int32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("grid", np.int32), ("k", np.int32), ("i", np.int32), ("j", np.int32), ("xloc", np.float32), ("yloc", np.float32), ("zloc", np.float32), ] ) elif self.version == 7: dtype = np.dtype( [ ("timepointindex", np.int32), ("timestep", np.int32), ("time", np.float32), ("particleid", np.int32), ("particlegroup", np.int32), ("particleidloc", np.int32), ("node", np.int32), ("xloc", np.float32), ("yloc", np.float32), ("zloc", np.float32), ("x", np.float32), ("y", np.float32), ("z", np.float32), ("k", np.int32), ] ) return dtype def _get_outdtype(self): outdtype = np.dtype( [ ("x", np.float32), ("y", np.float32), ("z", np.float32), ("time", np.float32), ("k", np.int32), ("particleid", np.int32), ] ) return outdtype
[docs] def get_maxid(self): """ Get the maximum timeseries number in the file timeseries file Returns ---------- out : int Maximum pathline number. """ return super().get_maxid()
[docs] def get_maxtime(self): """ Get the maximum time in timeseries file Returns ---------- out : float Maximum pathline time. """ return super().get_maxtime()
[docs] def get_data(self, partid=0, totim=None, ge=True): """ get timeseries data from the timeseries file for a single timeseries particleid. Parameters ---------- partid : int The zero-based particle id. The first record is record 0. totim : float The simulation time. All timeseries points for particle partid that are greater than or equal to (ge=True) or less than or equal to (ge=False) totim will be returned. Default is None ge : bool Boolean that determines if timeseries times greater than or equal to or less than or equal to totim is used to create a subset of timeseries. Default is True. Returns ---------- ra : numpy record array A numpy recarray with the x, y, z, time, k, and particleid for timeseries partid. See Also -------- Notes ----- Examples -------- >>> import flopy >>> tsobj = flopy.utils.TimeseriesFile('model.timeseries') >>> ts1 = tsobj.get_data(partid=1) """ return super().get_data(partid=partid, totim=totim, ge=ge)
[docs] def get_alldata(self, totim=None, ge=True): """ get timeseries data from the timeseries file for all timeseries and all times. Parameters ---------- totim : float The simulation time. All timeseries points for timeseries partid that are greater than or equal to (ge=True) or less than or equal to (ge=False) totim will be returned. Default is None ge : bool Boolean that determines if timeseries times greater than or equal to or less than or equal to totim is used to create a subset of timeseries. Default is True. Returns ---------- tlist : a list of numpy record array A list of numpy recarrays with the x, y, z, time, k, and particleid for all timeseries. Examples -------- >>> import flopy >>> tsobj = flopy.utils.TimeseriesFile('model.timeseries') >>> ts = tsobj.get_alldata() """ return super().get_alldata(totim=totim, ge=ge)
[docs] def get_destination_timeseries_data(self, dest_cells): """ Get timeseries data that pass through a set of destination cells. Parameters ---------- dest_cells : list or array of tuples (k, i, j) of each destination cell for MODPATH versions less than MODPATH 7 or node number of each destination cell. (zero based) Returns ------- tsdest : np.recarray Slice of timeseries data array (e.g. TmeseriesFile._data) containing only timeseries that pass through (k,i,j) or (node) dest_cells. Examples -------- >>> import flopy >>> ts = flopy.utils.TimeseriesFile('modpath.timeseries') >>> tss = ts.get_destination_timeseries_data([(0, 0, 0), ... (1, 0, 0)]) """ return super().get_destination_data(dest_cells=dest_cells)
[docs] def write_shapefile( self, timeseries_data=None, one_per_particle=True, direction="ending", shpname="pathlines.shp", mg=None, epsg=None, **kwargs, ): """ Write pathlines to a shapefile timeseries_data : np.recarray Record array of same form as that returned by Timeseries.get_alldata(). (if none, Timeseries.get_alldata() is exported). one_per_particle : boolean (default True) True writes a single LineString with a single set of attribute data for each particle. False writes a record/geometry for each pathline segment (each row in the Timeseries file). This option can be used to visualize attribute information (time, model layer, etc.) across a pathline in a GIS. direction : str String defining if starting or ending particle locations should be included in shapefile attribute information. Only used if one_per_particle=False. (default is 'ending') shpname : str File path for shapefile mg : flopy.discretization.grid instance Used to scale and rotate Global x,y,z values in MODPATH Timeseries file. epsg : int EPSG code for writing projection (.prj) file. If this is not supplied, the proj4 string or epgs code associated with mg will be used. kwargs : keyword arguments to flopy.export.shapefile_utils.recarray2shp """ super().write_shapefile( data=timeseries_data, one_per_particle=one_per_particle, direction=direction, shpname=shpname, mg=mg, epsg=epsg, **kwargs, )