Source code for flopy.export.shapefile_utils

"""
Module for exporting and importing flopy model attributes

"""

import copy
import json
import os
import shutil
import sys
import warnings
from pathlib import Path
from typing import List, Optional, Union
from warnings import warn

import numpy as np

from ..datbase import DataInterface, DataType
from ..discretization.grid import Grid
from ..utils import Util3d, flopy_io, import_optional_dependency
from ..utils.crs import get_crs


[docs]def write_gridlines_shapefile(filename: Union[str, os.PathLike], mg): """ Write a polyline shapefile of the grid lines - a lightweight alternative to polygons. Parameters ---------- filename : str or PathLike path of the shapefile to write mg : flopy.discretization.grid.Grid object flopy model grid Returns ------- None """ shapefile = import_optional_dependency("shapefile") if not isinstance(mg, Grid): raise ValueError( f"'mg' must be a flopy Grid subclass instance; found '{type(mg)}'" ) wr = shapefile.Writer(str(filename), shapeType=shapefile.POLYLINE) wr.field("number", "N", 18, 0) grid_lines = mg.grid_lines for i, line in enumerate(grid_lines): wr.line([line]) wr.record(i) wr.close() try: write_prj(filename, modelgrid=mg) except ImportError: pass return
[docs]def write_grid_shapefile( path: Union[str, os.PathLike], mg, array_dict, nan_val=np.nan, crs=None, prjfile: Optional[Union[str, os.PathLike]] = None, verbose=False, **kwargs, ): """ Method to write a shapefile of gridded input data Parameters ---------- path : str or PathLike shapefile file path mg : flopy.discretization.grid.Grid object flopy model grid array_dict : dict dictionary of model input arrays nan_val : float value to fill nans crs : pyproj.CRS, int, str, optional if `prjfile` is specified Coordinate reference system (CRS) for the model grid (must be projected; geographic CRS are not supported). The value can be anything accepted by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, such as an authority string (eg "EPSG:26916") or a WKT string. prjfile : str or pathlike, optional if `crs` is specified ESRI-style projection file with well-known text defining the CRS for the model grid (must be projected; geographic CRS are not supported). **kwargs : dict, optional Support deprecated keyword options. .. deprecated:: 3.5 The following keyword options will be removed for FloPy 3.6: - ``epsg`` (int): use ``crs`` instead. - ``prj`` (str or PathLike): use ``prjfile`` instead. Returns ------- None """ shapefile = import_optional_dependency("shapefile") w = shapefile.Writer(str(path), shapeType=shapefile.POLYGON) w.autoBalance = 1 if not isinstance(mg, Grid): raise ValueError( f"'mg' must be a flopy Grid subclass instance; found '{type(mg)}'" ) elif mg.grid_type == "structured": verts = [ mg.get_cell_vertices(i, j) for i in range(mg.nrow) for j in range(mg.ncol) ] elif mg.grid_type == "vertex": verts = [mg.get_cell_vertices(cellid) for cellid in range(mg.ncpl)] elif mg.grid_type == "unstructured": verts = [mg.get_cell_vertices(cellid) for cellid in range(mg.nnodes)] else: raise NotImplementedError(f"Grid type {mg.grid_type} not supported.") # set up the attribute fields and arrays of attributes if mg.grid_type == "structured": names = ["node", "row", "column"] + list(array_dict.keys()) dtypes = [ ("node", np.dtype("int")), ("row", np.dtype("int")), ("column", np.dtype("int")), ] + [ (enforce_10ch_limit([name])[0], array_dict[name].dtype) for name in names[3:] ] node = list(range(1, mg.ncol * mg.nrow + 1)) col = list(range(1, mg.ncol + 1)) * mg.nrow row = sorted(list(range(1, mg.nrow + 1)) * mg.ncol) at = np.vstack( [node, row, col] + [array_dict[name].ravel() for name in names[3:]] ).transpose() names = enforce_10ch_limit(names) elif mg.grid_type == "vertex": names = ["node"] + list(array_dict.keys()) dtypes = [("node", np.dtype("int"))] + [ (enforce_10ch_limit([name])[0], array_dict[name].dtype) for name in names[1:] ] node = list(range(1, mg.ncpl + 1)) at = np.vstack( [node] + [array_dict[name].ravel() for name in names[1:]] ).transpose() names = enforce_10ch_limit(names) elif mg.grid_type == "unstructured": if mg.nlay is None: names = ["node"] + list(array_dict.keys()) dtypes = [("node", np.dtype("int"))] + [ (enforce_10ch_limit([name])[0], array_dict[name].dtype) for name in names[1:] ] node = list(range(1, mg.nnodes + 1)) at = np.vstack( [node] + [array_dict[name].ravel() for name in names[1:]] ).transpose() else: names = ["node", "layer"] + list(array_dict.keys()) dtypes = [ ("node", np.dtype("int")), ("layer", np.dtype("int")), ] + [ (enforce_10ch_limit([name])[0], array_dict[name].dtype) for name in names[2:] ] node = list(range(1, mg.nnodes + 1)) layer = np.zeros(mg.nnodes) for ilay in range(mg.nlay): istart, istop = mg.get_layer_node_range(ilay) layer[istart:istop] = ilay + 1 at = np.vstack( [node] + [layer] + [array_dict[name].ravel() for name in names[2:]] ).transpose() names = enforce_10ch_limit(names) # flag nan values and explicitly set the dtypes if at.dtype in [float, np.float32, np.float64]: at[np.isnan(at)] = nan_val at = np.array([tuple(i) for i in at], dtype=dtypes) # write field information fieldinfo = { name: get_pyshp_field_info(dtype.name) for name, dtype in dtypes } for n in names: w.field(n, *fieldinfo[n]) for i, r in enumerate(at): # check if polygon is closed, if not close polygon for QGIS if verts[i][-1] != verts[i][0]: verts[i] = verts[i] + [verts[i][0]] w.poly([verts[i]]) w.record(*r) # close w.close() if verbose: print(f"wrote {flopy_io.relpath_safe(path)}") # handle deprecated projection kwargs; warnings are raised in crs.py write_prj_args = {} if "epsg" in kwargs: write_prj_args["epsg"] = kwargs.pop("epsg") if "prj" in kwargs: write_prj_args["prj"] = kwargs.pop("prj") if kwargs: raise TypeError(f"unhandled keywords: {kwargs}") # write the projection file try: write_prj(path, mg, crs=crs, prjfile=prjfile, **write_prj_args) except ImportError: if verbose: print("projection file not written") return
[docs]def model_attributes_to_shapefile( path: Union[str, os.PathLike], ml, package_names=None, array_dict=None, verbose=False, **kwargs, ): """ Wrapper function for writing a shapefile of model data. If package_names is not None, then search through the requested packages looking for arrays that can be added to the shapefile as attributes Parameters ---------- path : str or PathLike path to write the shapefile to ml : flopy.mbase model instance package_names : list of package names (e.g. ["dis","lpf"]) Packages to export data arrays to shapefile. (default is None) array_dict : dict of {name:2D array} pairs Additional 2D arrays to add as attributes to the shapefile. (default is None) verbose : bool, optional, default False whether to print verbose output **kwargs : keyword arguments modelgrid : fp.modflow.Grid object if modelgrid is supplied, user supplied modelgrid is used in lieu of the modelgrid attached to the modflow model object crs : pyproj.CRS, int, str, optional if `prjfile` is specified Coordinate reference system (CRS) for the model grid (must be projected; geographic CRS are not supported). The value can be anything accepted by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, such as an authority string (eg "EPSG:26916") or a WKT string. prjfile : str or pathlike, optional if `crs` is specified ESRI-style projection file with well-known text defining the CRS for the model grid (must be projected; geographic CRS are not supported). Returns ------- None Examples -------- >>> import flopy >>> m = flopy.modflow.Modflow() >>> flopy.utils.model_attributes_to_shapefile('model.shp', m) """ if array_dict is None: array_dict = {} if package_names is not None: if not isinstance(package_names, list): package_names = [package_names] else: package_names = [pak.name[0] for pak in ml.packagelist] if "modelgrid" in kwargs: grid = kwargs.pop("modelgrid") else: grid = ml.modelgrid horz_shape = grid.get_plottable_layer_shape() for pname in package_names: pak = ml.get_package(pname) attrs = dir(pak) if pak is not None: if "start_datetime" in attrs: attrs.remove("start_datetime") for attr in attrs: a = pak.__getattribute__(attr) if ( a is None or not hasattr(a, "data_type") or a.name == "thickness" ): continue if a.data_type == DataType.array2d: if a.array is None or a.array.shape != horz_shape: warn( "Failed to get data for " f"{a.name} array, {pak.name[0]} package" ) continue name = shape_attr_name(a.name, keep_layer=True) # name = a.name.lower() array_dict[name] = a.array elif a.data_type == DataType.array3d: # Not sure how best to check if an object has array data if a.array is None: warn( "Failed to get data for " f"{a.name} array, {pak.name[0]} package" ) continue if isinstance(a.name, list) and a.name[0] == "thickness": continue if a.array.shape == horz_shape: if hasattr(a, "shape"): if a.shape[1] is None: # usg unstructured Util3d # return a flattened array, with a.name[0] (a per-layer list) array_dict[a.name[0]] = a.array else: array_dict[a.name] = a.array else: array_dict[a.name] = a.array else: # array is not the same shape as the layer shape for ilay in range(a.array.shape[0]): try: arr = a.array[ilay] except: arr = a[ilay] if isinstance(a, Util3d): aname = shape_attr_name(a[ilay].name) else: aname = a.name if arr.shape == (1,) + horz_shape: # fix for mf6 case arr = arr[0] assert arr.shape == horz_shape name = f"{aname}_{ilay + 1}" array_dict[name] = arr elif ( a.data_type == DataType.transient2d ): # elif isinstance(a, Transient2d): # Not sure how best to check if an object has array data try: assert a.array is not None except: warn( "Failed to get data for " f"{a.name} array, {pak.name[0]} package" ) continue for kper in range(a.array.shape[0]): name = f"{shape_attr_name(a.name)}{kper + 1}" arr = a.array[kper][0] assert arr.shape == horz_shape array_dict[name] = arr elif ( a.data_type == DataType.transientlist ): # elif isinstance(a, MfList): try: list(a.masked_4D_arrays_itr()) except: continue for name, array in a.masked_4D_arrays_itr(): for kper in range(array.shape[0]): for k in range(array.shape[1]): n = shape_attr_name(name, length=4) aname = f"{n}{k + 1}{kper + 1}" arr = array[kper][k] assert arr.shape == horz_shape if np.all(np.isnan(arr)): continue array_dict[aname] = arr elif isinstance(a, list): for v in a: if ( isinstance(a, DataInterface) and v.data_type == DataType.array3d ): for ilay in range(a.model.modelgrid.nlay): u2d = a[ilay] name = ( f"{shape_attr_name(u2d.name)}_{ilay + 1}" ) arr = u2d.array assert arr.shape == horz_shape array_dict[name] = arr # write data arrays to a shapefile write_grid_shapefile(path, grid, array_dict) crs = kwargs.get("crs", None) prjfile = kwargs.get("prjfile", None) try: write_prj(path, grid, crs=crs, prjfile=prjfile) except ImportError: pass
[docs]def shape_attr_name(name, length=6, keep_layer=False): """ Function for to format an array name to a maximum of 10 characters to conform with ESRI shapefile maximum attribute name length Parameters ---------- name : str data array name length : int maximum length of string to return. Value passed to function is overridden and set to 10 if keep_layer=True. (default is 6) keep_layer : bool Boolean that determines if layer number in name should be retained. (default is False) Returns ------- str Examples -------- >>> import flopy >>> name = flopy.utils.shape_attr_name('averylongstring') >>> name >>> 'averyl' """ # kludges if name == "model_top": name = "top" # replace spaces with "_" n = name.lower().replace(" ", "_") # exclude "_layer_X" portion of string if keep_layer: length = 10 n = n.replace("_layer", "_") else: try: idx = n.index("_layer") n = n[:idx] except: pass if len(n) > length: n = n[:length] return n
[docs]def enforce_10ch_limit(names: List[str], warnings: bool = True) -> List[str]: """Enforce 10 character limit for fieldnames. Add suffix for duplicate names starting at 0. Parameters ---------- names : list of strings warnings : whether to warn if names are truncated Returns ------- list list of unique strings of len <= 10. """ def truncate(s): name = s[:5] + s[-4:] + "_" if warnings: warn(f"Truncating shapefile fieldname {s} to {name}") return name names = [truncate(n) if len(n) > 10 else n for n in names] dups = {x: names.count(x) for x in names} suffix = {n: list(range(cnt)) for n, cnt in dups.items() if cnt > 1} for i, n in enumerate(names): if dups[n] > 1: names[i] = n[:9] + str(suffix[n].pop(0)) return names
[docs]def get_pyshp_field_info(dtypename): """Get pyshp dtype information for a given numpy dtype.""" fields = { "int": ("N", 18, 0), "<i": ("N", 18, 0), "float": ("F", 20, 12), "<f": ("F", 20, 12), "bool": ("L", 1), "b1": ("L", 1), "str": ("C", 50), "object": ("C", 50), } k = [k for k in fields.keys() if k in dtypename.lower()] if len(k) == 1: return fields[k[0]] else: return fields["str"]
[docs]def get_pyshp_field_dtypes(code): """Returns a numpy dtype for a pyshp field type.""" dtypes = { "N": int, "F": float, "L": bool, "C": object, } return dtypes.get(code, object)
[docs]def shp2recarray(shpname: Union[str, os.PathLike]): """Read a shapefile into a numpy recarray. Parameters ---------- shpname : str or PathLike ESRI Shapefile path Returns ------- np.recarray """ from ..utils.geospatial_utils import GeoSpatialCollection sf = import_optional_dependency("shapefile") sfobj = sf.Reader(str(shpname)) dtype = [ (str(f[0]), get_pyshp_field_dtypes(f[1])) for f in sfobj.fields[1:] ] geoms = GeoSpatialCollection(sfobj).flopy_geometry records = [ tuple(r) + (geoms[i],) for i, r in enumerate(sfobj.iterRecords()) ] dtype += [("geometry", object)] recarray = np.array(records, dtype=dtype).view(np.recarray) return recarray
[docs]def recarray2shp( recarray, geoms, shpname: Union[str, os.PathLike] = "recarray.shp", mg=None, crs=None, prjfile: Optional[Union[str, os.PathLike]] = None, verbose=False, **kwargs, ): """ Write a numpy record array to a shapefile, using a corresponding list of geometries. Method supports list of flopy geometry objects, flopy Collection object, shapely Collection object, and geojson Geometry Collection objects Parameters ---------- recarray : np.recarray Numpy record array with attribute information that will go in the shapefile geoms : list of flopy.utils.geometry, shapely geometry collection, flopy geometry collection, shapefile.Shapes, list of shapefile.Shape objects, or geojson geometry collection The number of geometries in geoms must equal the number of records in recarray. shpname : str or PathLike, default "recarray.shp" Path for the output shapefile mg : flopy.discretization.Grid object flopy model grid crs : pyproj.CRS, int, str, optional if `prjfile` is specified Coordinate reference system (CRS) for the model grid (must be projected; geographic CRS are not supported). The value can be anything accepted by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, such as an authority string (eg "EPSG:26916") or a WKT string. prjfile : str or pathlike, optional if `crs` is specified ESRI-style projection file with well-known text defining the CRS for the model grid (must be projected; geographic CRS are not supported). **kwargs : dict, optional Support deprecated keyword options. .. deprecated:: 3.5 The following keyword options will be removed for FloPy 3.6: - ``epsg`` (int): use ``crs`` instead. - ``prj`` (str or PathLike): use ``prjfile`` instead. Notes ----- Uses pyshp and optionally pyproj. """ from ..utils.geospatial_utils import GeoSpatialCollection if len(recarray) != len(geoms): raise IndexError( "Number of geometries must equal the number of records!" ) if len(recarray) == 0: raise Exception("Recarray is empty") geomtype = None geoms = GeoSpatialCollection(geoms).flopy_geometry for g in geoms: try: geomtype = g.shapeType except AttributeError: continue # set up for pyshp 2 shapefile = import_optional_dependency("shapefile") w = shapefile.Writer(str(shpname), shapeType=geomtype) w.autoBalance = 1 # set up the attribute fields names = enforce_10ch_limit(recarray.dtype.names) for i, npdtype in enumerate(recarray.dtype.descr): key = names[i] if not isinstance(key, str): key = str(key) w.field(key, *get_pyshp_field_info(npdtype[1])) # write the geometry and attributes for each record ralist = recarray.tolist() if geomtype == shapefile.POLYGON: for i, r in enumerate(ralist): w.poly(geoms[i].pyshp_parts) w.record(*r) elif geomtype == shapefile.POLYLINE: for i, r in enumerate(ralist): w.line(geoms[i].pyshp_parts) w.record(*r) elif geomtype == shapefile.POINT: # pyshp version 2.x w.point() method can only take x and y # code will need to be refactored in order to write POINTZ # shapes with the z attribute. for i, r in enumerate(ralist): w.point(*geoms[i].pyshp_parts[:2]) w.record(*r) w.close() if verbose: print(f"wrote {flopy_io.relpath_safe(os.getcwd(), shpname)}") # handle deprecated projection kwargs; warnings are raised in crs.py write_prj_args = {} if "epsg" in kwargs: write_prj_args["epsg"] = kwargs.pop("epsg") if "prj" in kwargs: write_prj_args["prj"] = kwargs.pop("prj") if kwargs: raise TypeError(f"unhandled keywords: {kwargs}") # write the projection file try: write_prj(shpname, mg, crs=crs, prjfile=prjfile, **write_prj_args) except ImportError: if verbose: print("projection file not written") return
[docs]def write_prj( shpname, modelgrid=None, crs=None, prjfile=None, **kwargs, ): # projection file name output_projection_file = Path(shpname).with_suffix(".prj") # handle deprecated projection kwargs; warnings are raised in crs.py get_crs_args = {} if "epsg" in kwargs: get_crs_args["epsg"] = kwargs.pop("epsg") if "prj" in kwargs: get_crs_args["prj"] = kwargs.pop("prj") if "wkt_string" in kwargs: get_crs_args["wkt_string"] = kwargs.pop("wkt_string") if kwargs: raise TypeError(f"unhandled keywords: {kwargs}") crs = get_crs(prjfile=prjfile, crs=crs, **get_crs_args) if crs is None and modelgrid is not None: crs = modelgrid.crs if crs is not None: output_projection_file.write_text(crs.to_wkt(), encoding="utf-8") else: print( "No CRS information for writing a .prj file.\n" "Supply an valid coordinate system reference to the attached " "modelgrid object or .export() method." )