import os
from itertools import repeat
from pathlib import Path
from typing import Union
import numpy as np
from packaging.version import Version
from ..datbase import DataInterface, DataListInterface, DataType
from ..mbase import BaseModel, ModelInterface
from ..pakbase import PackageInterface
from ..utils import (
CellBudgetFile,
FormattedHeadFile,
HeadFile,
UcnFile,
ZBNetOutput,
flopy_io,
import_optional_dependency,
)
from ..utils.crs import get_crs
from . import NetCdf, netcdf, shapefile_utils, vtk
from .longnames import NC_LONG_NAMES
from .unitsformat import NC_UNITS_FORMAT
NC_PRECISION_TYPE = {
np.float64: "f8",
np.float32: "f4",
int: "i4",
np.int64: "i4",
np.int32: "i4",
}
[docs]def ensemble_helper(
inputs_filename: Union[str, os.PathLike],
outputs_filename: Union[str, os.PathLike],
models,
add_reals=True,
**kwargs,
):
"""
Helper to export an ensemble of model instances. Assumes
all models have same dis and reference information, only difference is
properties and boundary conditions. Assumes model.nam.split('_')[-1] is
the realization suffix to use in the netcdf variable names
"""
f_in, f_out = None, None
for m in models[1:]:
assert (
m.get_nrow_ncol_nlay_nper() == models[0].get_nrow_ncol_nlay_nper()
)
if inputs_filename is not None:
f_in = models[0].export(inputs_filename, **kwargs)
vdict = {}
vdicts = [models[0].export(vdict, **kwargs)]
i = 1
for m in models[1:]:
suffix = m.name.split(".")[0].split("_")[-1]
vdict = {}
m.export(vdict, **kwargs)
vdicts.append(vdict)
if add_reals:
f_in.append(vdict, suffix=suffix)
i += 1
mean, stdev = {}, {}
for vname in vdict.keys():
alist = []
for vd in vdicts:
alist.append(vd[vname])
alist = np.array(alist)
mean[vname] = alist.mean(axis=0)
stdev[vname] = alist.std(axis=0)
mean[vname][vdict[vname] == netcdf.FILLVALUE] = netcdf.FILLVALUE
stdev[vname][vdict[vname] == netcdf.FILLVALUE] = netcdf.FILLVALUE
mean[vname][np.isnan(vdict[vname])] = netcdf.FILLVALUE
stdev[vname][np.isnan(vdict[vname])] = netcdf.FILLVALUE
if i >= 2:
if not add_reals:
f_in.write()
f_in = NetCdf.empty_like(mean, output_filename=inputs_filename)
f_in.append(mean, suffix="**mean**")
f_in.append(stdev, suffix="**stdev**")
else:
f_in.append(mean, suffix="**mean**")
f_in.append(stdev, suffix="**stdev**")
f_in.add_global_attributes({"namefile": ""})
if outputs_filename is not None:
f_out = output_helper(
outputs_filename,
models[0],
models[0].load_results(as_dict=True),
**kwargs,
)
vdict = {}
vdicts = [
output_helper(
vdict,
models[0],
models[0].load_results(as_dict=True),
**kwargs,
)
]
i = 1
for m in models[1:]:
suffix = m.name.split(".")[0].split("_")[-1]
oudic = m.load_results(as_dict=True)
vdict = {}
output_helper(vdict, m, oudic, **kwargs)
vdicts.append(vdict)
if add_reals:
f_out.append(vdict, suffix=suffix)
i += 1
mean, stdev = {}, {}
for vname in vdict.keys():
alist = []
for vd in vdicts:
alist.append(vd[vname])
alist = np.array(alist)
mean[vname] = alist.mean(axis=0)
stdev[vname] = alist.std(axis=0)
mean[vname][np.isnan(vdict[vname])] = netcdf.FILLVALUE
stdev[vname][np.isnan(vdict[vname])] = netcdf.FILLVALUE
mean[vname][vdict[vname] == netcdf.FILLVALUE] = netcdf.FILLVALUE
stdev[vname][vdict[vname] == netcdf.FILLVALUE] = netcdf.FILLVALUE
if i >= 2:
if not add_reals:
f_out.write()
f_out = NetCdf.empty_like(
mean, output_filename=outputs_filename
)
f_out.append(mean, suffix="**mean**")
f_out.append(stdev, suffix="**stdev**")
else:
f_out.append(mean, suffix="**mean**")
f_out.append(stdev, suffix="**stdev**")
f_out.add_global_attributes({"namefile": ""})
return f_in, f_out
def _add_output_nc_variable(
nc,
times,
shape3d,
out_obj,
var_name,
logger=None,
text="",
mask_vals=(),
mask_array3d=None,
):
if logger:
logger.log(f"creating array for {var_name}")
array = np.zeros(
(len(times), shape3d[0], shape3d[1], shape3d[2]), dtype=np.float32
)
array[:] = np.nan
if isinstance(out_obj, ZBNetOutput):
a = np.asarray(out_obj.zone_array, dtype=np.float32)
if mask_array3d is not None:
a[mask_array3d] = np.nan
for i, _ in enumerate(times):
array[i, :, :, :] = a
else:
for i, t in enumerate(times):
if t in out_obj.recordarray["totim"]:
try:
if text:
a = out_obj.get_data(totim=t, full3D=True, text=text)
if isinstance(a, list):
a = a[0]
else:
a = out_obj.get_data(totim=t)
except Exception as e:
nme = var_name + text.decode().strip().lower()
estr = f"error getting data for {nme} at time {t}:{e!s}"
if logger:
logger.warn(estr)
else:
print(estr)
continue
if mask_array3d is not None and a.shape == mask_array3d.shape:
a[mask_array3d] = np.nan
try:
array[i, :, :, :] = a.astype(np.float32)
except Exception as e:
nme = var_name + text.decode().strip().lower()
estr = f"error assigning {nme} data to array for time {t}:{e!s}"
if logger:
logger.warn(estr)
else:
print(estr)
continue
if logger:
logger.log(f"creating array for {var_name}")
for mask_val in mask_vals:
array[np.where(array == mask_val)] = np.nan
mx, mn = np.nanmax(array), np.nanmin(array)
array[np.isnan(array)] = netcdf.FILLVALUE
if isinstance(nc, dict):
if text:
var_name = text.decode().strip().lower()
nc[var_name] = array
return nc
units = None
if var_name in NC_UNITS_FORMAT:
units = NC_UNITS_FORMAT[var_name].format(nc.grid_units, nc.time_units)
precision_str = "f4"
if text:
var_name = text.decode().strip().lower()
attribs = {"long_name": var_name}
attribs["coordinates"] = "time layer latitude longitude"
attribs["min"] = mn
attribs["max"] = mx
if units is not None:
attribs["units"] = units
try:
dim_tuple = ("time",) + nc.dimension_names
var = nc.create_variable(
var_name,
attribs,
precision_str=precision_str,
dimensions=dim_tuple,
)
except Exception as e:
estr = f"error creating variable {var_name}:\n{e!s}"
if logger:
logger.lraise(estr)
else:
raise Exception(estr)
try:
var[:] = array
except Exception as e:
estr = f"error setting array to variable {var_name}:\n{e!s}"
if logger:
logger.lraise(estr)
else:
raise Exception(estr)
def _add_output_nc_zonebudget_variable(nc, array, var_name, flux, logger=None):
"""
Method to add zonebudget output data to netcdf file
Parameters
----------
nc : NetCdf object
the NetCDF object
array : np.ndarray
zonebudget output budget group array
var_name : str
variable name
flux : bool
flag for flux data or volumetric data
logger : None or Logger
logger instance
"""
if logger:
logger.log(f"creating array for {var_name}")
mn = np.min(array)
mx = np.max(array)
precision_str = "f4"
if flux:
units = f"{nc.grid_units}^3/{nc.time_units}"
else:
units = f"{nc.grid_units}^3"
attribs = {"long_name": var_name}
attribs["coordinates"] = "time zone"
attribs["min"] = mn
attribs["max"] = mx
attribs["units"] = units
dim_tuple = ("time", "zone")
var = nc.create_group_variable(
"zonebudget", var_name, attribs, precision_str, dim_tuple
)
var[:] = array
[docs]def output_helper(
f: Union[str, os.PathLike, NetCdf, dict],
ml,
oudic,
verbose=False,
**kwargs,
):
"""
Export model outputs using the model spatial reference info.
Parameters
----------
f : str or PathLike or NetCdf or dict
filepath to write output to (must have .shp or .nc extension), NetCDF object, or dictionary
ml : flopy.mbase.ModelInterface derived type
oudic : dict
output_filename,flopy datafile/cellbudgetfile instance
verbose : bool
whether to show verbose output
**kwargs : keyword arguments
modelgrid : flopy.discretizaiton.Grid
user supplied model grid instance that will be used for export
in lieu of the models model grid instance
mflay : int
zero based model layer which can be used in shapefile exporting
kper : int
zero based stress period which can be used for shapefile exporting
Returns
-------
None
Note:
----
casts down double precision to single precision for netCDF files
"""
assert isinstance(ml, (BaseModel, ModelInterface))
assert len(oudic.keys()) > 0
logger = kwargs.pop("logger", None)
stride = kwargs.pop("stride", 1)
forgive = kwargs.pop("forgive", False)
kwargs.pop("suffix", None)
mask_vals = []
mflay = kwargs.pop("mflay", None)
kper = kwargs.pop("kper", None)
if "masked_vals" in kwargs:
mask_vals = kwargs.pop("masked_vals")
if len(kwargs) > 0 and logger is not None:
str_args = ",".join(kwargs)
logger.warn(f"unused kwargs: {str_args}")
zonebud = None
zbkey = None
for key, value in oudic.items():
if isinstance(value, ZBNetOutput):
zbkey = key
break
if zbkey is not None:
zonebud = oudic.pop(zbkey)
# ISSUE - need to round the totims in each output file instance so
# that they will line up
for key in oudic.keys():
out = oudic[key]
times = [float(f"{t:15.6f}") for t in out.recordarray["totim"]]
out.recordarray["totim"] = times
times = []
for filename, df in oudic.items():
for t in df.recordarray["totim"]:
if t not in times:
times.append(t)
if zonebud is not None and not oudic:
if isinstance(f, NetCdf):
times = f.time_values_arg
else:
times = zonebud.time
assert len(times) > 0
times.sort()
# rectify times - only use times that are common to every output file
common_times = []
skipped_times = []
for t in times:
keep = True
for filename, df in oudic.items():
if isinstance(df, ZBNetOutput):
continue
if t not in df.recordarray["totim"]:
keep = False
break
if keep:
common_times.append(t)
else:
skipped_times.append(t)
assert len(common_times) > 0
if len(skipped_times) > 0:
msg = (
"the following output times are not common to all "
f"output files and are being skipped:\n{skipped_times}"
)
if logger:
logger.warn(msg)
elif verbose:
print(msg)
times = [t for t in common_times[::stride]]
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".nc":
f = NetCdf(
f, ml, time_values=times, logger=logger, forgive=forgive, **kwargs
)
elif isinstance(f, NetCdf):
otimes = list(f.nc.variables["time"][:])
assert otimes == times
if isinstance(f, NetCdf) or isinstance(f, dict):
shape3d = (ml.modelgrid.nlay, ml.modelgrid.nrow, ml.modelgrid.ncol)
mask_array3d = None
if ml.hdry is not None:
mask_vals.append(ml.hdry)
if ml.hnoflo is not None:
mask_vals.append(ml.hnoflo)
if ml.modelgrid.idomain is not None:
mask_array3d = ml.modelgrid.idomain == 0
for filename, out_obj in oudic.items():
filename = filename.lower()
if isinstance(out_obj, UcnFile):
_add_output_nc_variable(
f,
times,
shape3d,
out_obj,
"concentration",
logger=logger,
mask_vals=mask_vals,
mask_array3d=mask_array3d,
)
elif isinstance(out_obj, HeadFile):
_add_output_nc_variable(
f,
times,
shape3d,
out_obj,
out_obj.text.decode(),
logger=logger,
mask_vals=mask_vals,
mask_array3d=mask_array3d,
)
elif isinstance(out_obj, FormattedHeadFile):
_add_output_nc_variable(
f,
times,
shape3d,
out_obj,
out_obj.text,
logger=logger,
mask_vals=mask_vals,
mask_array3d=mask_array3d,
)
elif isinstance(out_obj, CellBudgetFile):
var_name = "cell_by_cell_flow"
for text in out_obj.textlist:
_add_output_nc_variable(
f,
times,
shape3d,
out_obj,
var_name,
logger=logger,
text=text,
mask_vals=mask_vals,
mask_array3d=mask_array3d,
)
else:
estr = f"unrecognized file extension:{filename}"
if logger:
logger.lraise(estr)
else:
raise Exception(estr)
if zonebud is not None:
try:
f.initialize_group(
"zonebudget",
dimensions=("time", "zone"),
dimension_data={
"time": zonebud.time,
"zone": zonebud.zones,
},
)
except AttributeError:
pass
for text, array in zonebud.arrays.items():
_add_output_nc_zonebudget_variable(
f, array, text, zonebud.flux, logger
)
# write the zone array to standard output
_add_output_nc_variable(
f,
times,
shape3d,
zonebud,
"budget_zones",
logger=logger,
mask_vals=mask_vals,
mask_array3d=mask_array3d,
)
elif (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".shp":
attrib_dict = {}
for _, out_obj in oudic.items():
if (
isinstance(out_obj, HeadFile)
or isinstance(out_obj, FormattedHeadFile)
or isinstance(out_obj, UcnFile)
):
if isinstance(out_obj, UcnFile):
attrib_name = "conc"
else:
attrib_name = "head"
plotarray = np.atleast_3d(
out_obj.get_alldata().transpose()
).transpose()
for per in range(plotarray.shape[0]):
for k in range(plotarray.shape[1]):
if kper is not None and per != kper:
continue
if mflay is not None and k != mflay:
continue
name = f"{attrib_name}{per}_{k}"
attrib_dict[name] = plotarray[per][k]
elif isinstance(out_obj, CellBudgetFile):
names = out_obj.get_unique_record_names(decode=True)
for attrib_name in names:
plotarray = np.atleast_3d(
out_obj.get_data(text=attrib_name, full3D=True)
)
attrib_name = attrib_name.strip()
if attrib_name == "FLOW RIGHT FACE":
attrib_name = "FRF"
elif attrib_name == "FLOW FRONT FACE":
attrib_name = "FFF"
elif attrib_name == "FLOW LOWER FACE":
attrib_name = "FLF"
else:
pass
for per in range(plotarray.shape[0]):
for k in range(plotarray.shape[1]):
if kper is not None and per != kper:
continue
if mflay is not None and k != mflay:
continue
name = f"{attrib_name}{per}_{k}"
attrib_dict[name] = plotarray[per][k]
if attrib_dict:
shapefile_utils.write_grid_shapefile(f, ml.modelgrid, attrib_dict)
else:
msg = f"unrecognized export argument:{f}"
if logger:
logger.lraise(msg)
else:
raise NotImplementedError(msg)
return f
[docs]def model_export(
f: Union[str, os.PathLike, NetCdf, dict], ml, fmt=None, **kwargs
):
"""
Method to export a model to a shapefile or netcdf file
Parameters
----------
f : str or PathLike or NetCdf or dict
file path (".nc" for netcdf or ".shp" for shapefile) or NetCDF object or dictionary
ml : flopy.modflow.mbase.ModelInterface object
flopy model object
fmt : str
output format flag. 'vtk' will export to vtk
**kwargs : keyword arguments
modelgrid: flopy.discretization.Grid
user supplied modelgrid object which will supersede the built
in modelgrid object
crs : pyproj.CRS, int, str, optional if `prjfile` is specified
Coordinate reference system (CRS) for the model grid
(must be projected; geographic CRS are not supported).
The value can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:26916") or a WKT string.
prjfile : str or pathlike, optional if `crs` is specified
ESRI-style projection file with well-known text defining the CRS
for the model grid (must be projected; geographic CRS are not supported).
if fmt is set to 'vtk', parameters of Vtk initializer
"""
assert isinstance(ml, ModelInterface)
package_names = kwargs.get("package_names", None)
if package_names is None:
package_names = [pak.name[0] for pak in ml.packagelist]
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".nc":
f = NetCdf(f, ml, **kwargs)
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".shp":
shapefile_utils.model_attributes_to_shapefile(
f, ml, package_names=package_names, **kwargs
)
elif isinstance(f, NetCdf):
for pak in ml.packagelist:
if pak.name[0] in package_names:
f = package_export(f, pak, **kwargs)
assert f is not None
return f
elif isinstance(f, dict):
for pak in ml.packagelist:
f = package_export(f, pak, **kwargs)
elif fmt == "vtk":
# call vtk model export
name = kwargs.get("name", ml.name)
xml = kwargs.get("xml", False)
masked_values = kwargs.get("masked_values", None)
pvd = kwargs.get("pvd", False)
smooth = kwargs.get("smooth", False)
point_scalars = kwargs.get("point_scalars", False)
binary = kwargs.get("binary", True)
vertical_exageration = kwargs.get("vertical_exageration", 1)
kpers = kwargs.get("kpers", None)
package_names = kwargs.get("package_names", None)
vtkobj = vtk.Vtk(
ml,
vertical_exageration=vertical_exageration,
binary=binary,
xml=xml,
pvd=pvd,
smooth=smooth,
point_scalars=point_scalars,
)
vtkobj.add_model(
ml, masked_values=masked_values, selpaklist=package_names
)
vtkobj.write(os.path.join(f, name), kpers)
else:
raise NotImplementedError(f"unrecognized export argument:{f}")
[docs]def package_export(
f: Union[str, os.PathLike, NetCdf, dict],
pak,
fmt=None,
verbose=False,
**kwargs,
):
"""
Method to export a package to shapefile or netcdf
Parameters
----------
f : str or PathLike or NetCdf or dict
output file path (extension .shp for shapefile or .nc for netcdf) or NetCDF object or dictionary
pak : flopy.pakbase.Package object
package to export
fmt : str
output format flag. 'vtk' will export to vtk
** kwargs : keyword arguments
modelgrid: flopy.discretization.Grid
user supplied modelgrid object which will supersede the built
in modelgrid object
crs : pyproj.CRS, int, str, optional if `prjfile` is specified
Coordinate reference system (CRS) for the model grid
(must be projected; geographic CRS are not supported).
The value can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:26916") or a WKT string.
prjfile : str or pathlike, optional if `crs` is specified
ESRI-style projection file with well-known text defining the CRS
for the model grid (must be projected; geographic CRS are not supported).
if fmt is set to 'vtk', parameters of Vtk initializer
Returns
-------
f : NetCdf object or None
"""
assert isinstance(pak, PackageInterface)
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".nc":
f = NetCdf(f, pak.parent, **kwargs)
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".shp":
shapefile_utils.model_attributes_to_shapefile(
f, pak.parent, package_names=pak.name, verbose=verbose, **kwargs
)
elif isinstance(f, NetCdf) or isinstance(f, dict):
for a in pak.data_list:
if isinstance(a, DataInterface):
if a.array is not None:
if (
a.data_type == DataType.array2d
and len(a.array.shape) == 2
and a.array.shape[1] > 0
):
try:
f = array2d_export(f, a, **kwargs)
except:
f.logger.warn(f"error adding {a.name} as variable")
elif a.data_type == DataType.array3d:
f = array3d_export(f, a, **kwargs)
elif a.data_type == DataType.transient2d:
f = transient2d_export(f, a, **kwargs)
elif a.data_type == DataType.transientlist:
f = mflist_export(f, a, **kwargs)
elif isinstance(a, list):
for v in a:
if (
isinstance(a, DataInterface)
and v.data_type == DataType.array3d
):
f = array3d_export(f, v, **kwargs)
return f
elif fmt == "vtk":
# call vtk array export to folder
name = kwargs.get("name", pak.name[0])
xml = kwargs.get("xml", False)
masked_values = kwargs.get("masked_values", None)
pvd = kwargs.get("pvd", False)
smooth = kwargs.get("smooth", False)
point_scalars = kwargs.get("point_scalars", False)
binary = kwargs.get("binary", True)
vertical_exageration = kwargs.get("vertical_exageration", 1)
kpers = kwargs.get("kpers", None)
vtkobj = vtk.Vtk(
pak.parent,
vertical_exageration=vertical_exageration,
binary=binary,
xml=xml,
pvd=pvd,
smooth=smooth,
point_scalars=point_scalars,
)
vtkobj.add_package(pak, masked_values=masked_values)
vtkobj.write(os.path.join(f, name), kper=kpers)
else:
raise NotImplementedError(f"unrecognized export argument:{f}")
[docs]def generic_array_export(
f: Union[str, os.PathLike],
array,
var_name="generic_array",
dimensions=("time", "layer", "y", "x"),
precision_str="f4",
units="unitless",
**kwargs,
):
"""
Method to export a generic array to NetCdf
Parameters
----------
f : str or PathLike
filename or existing export instance type (NetCdf only for now)
array : np.ndarray
var_name : str
variable name
dimensions : tuple
netcdf dimensions
precision_str : str
binary precision string, default "f4"
units : string
units of array data
**kwargs : keyword arguments
model : flopy.modflow.mbase
flopy model object
"""
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".nc":
assert "model" in kwargs.keys(), (
"creating a new netCDF using generic_array_helper requires a "
"'model' kwarg"
)
assert isinstance(kwargs["model"], BaseModel)
f = NetCdf(f, kwargs.pop("model"), **kwargs)
assert array.ndim == len(
dimensions
), "generic_array_helper() array.ndim != dimensions"
coords_dims = {
"time": "time",
"layer": "layer",
"y": "latitude",
"x": "longitude",
}
coords = " ".join([coords_dims[d] for d in dimensions])
mn = kwargs.pop("min", -1.0e9)
mx = kwargs.pop("max", 1.0e9)
long_name = kwargs.pop("long_name", var_name)
if len(kwargs) > 0:
f.logger.warn(
"generic_array_helper(): unrecognized kwargs:"
+ ",".join(kwargs.keys())
)
attribs = {"long_name": long_name}
attribs["coordinates"] = coords
attribs["units"] = units
attribs["min"] = mn
attribs["max"] = mx
if np.isnan(attribs["min"]) or np.isnan(attribs["max"]):
raise Exception(f"error processing {var_name}: all NaNs")
try:
var = f.create_variable(
var_name,
attribs,
precision_str=precision_str,
dimensions=dimensions,
)
except Exception as e:
estr = f"error creating variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
try:
var[:] = array
except Exception as e:
estr = f"error setting array to variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
return f
[docs]def mflist_export(f: Union[str, os.PathLike, NetCdf], mfl, **kwargs):
"""
export helper for MfList instances
Parameters
-----------
f : str or PathLike or NetCdf
file path or existing export instance type (NetCdf only for now)
mfl : MfList instance
**kwargs : keyword arguments
modelgrid : flopy.discretization.Grid
model grid instance which will supersede the flopy.model.modelgrid
crs : pyproj.CRS, int, str, optional if `prjfile` is specified
Coordinate reference system (CRS) for the model grid
(must be projected; geographic CRS are not supported).
The value can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:26916") or a WKT string.
prjfile : str or pathlike, optional if `crs` is specified
ESRI-style projection file with well-known text defining the CRS
for the model grid (must be projected; geographic CRS are not supported).
"""
if not isinstance(mfl, (DataListInterface, DataInterface)):
err = (
"mflist_helper only helps instances that support "
"DataListInterface"
)
raise AssertionError(err)
modelgrid = mfl.model.modelgrid
if "modelgrid" in kwargs:
modelgrid = kwargs.pop("modelgrid")
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".nc":
f = NetCdf(f, mfl.model, **kwargs)
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".shp":
sparse = kwargs.get("sparse", False)
kper = kwargs.get("kper", 0)
squeeze = kwargs.get("squeeze", True)
if modelgrid is None:
raise Exception("MfList.to_shapefile: modelgrid is not set")
elif modelgrid.grid_type == "USG-Unstructured":
raise Exception(
"Flopy does not support exporting to shapefile "
"from a MODFLOW-USG unstructured grid."
)
if kper is None:
keys = mfl.data.keys()
keys.sort()
else:
keys = [kper]
if not sparse:
array_dict = {}
for kk in keys:
arrays = mfl.to_array(kk)
for name, array in arrays.items():
for k in range(array.shape[0]):
# aname = name+"{0:03d}_{1:02d}".format(kk, k)
n = shapefile_utils.shape_attr_name(name, length=4)
aname = f"{n}{k + 1}{int(kk) + 1}"
array_dict[aname] = array[k]
shapefile_utils.write_grid_shapefile(f, modelgrid, array_dict)
else:
from ..export.shapefile_utils import recarray2shp
from ..utils.geometry import Polygon
df = mfl.get_dataframe(squeeze=squeeze)
if "kper" in kwargs or df is None:
ra = mfl[kper]
verts = np.array(modelgrid.get_cell_vertices(ra.i, ra.j))
elif df is not None:
verts = np.array(
[
modelgrid.get_cell_vertices(i, df.j.values[ix])
for ix, i in enumerate(df.i.values)
]
)
ra = df.to_records(index=False)
crs = kwargs.get("crs", None)
prjfile = kwargs.get("prjfile", None)
polys = np.array([Polygon(v) for v in verts])
recarray2shp(
ra,
geoms=polys,
shpname=f,
mg=modelgrid,
crs=crs,
prjfile=prjfile,
)
elif isinstance(f, NetCdf) or isinstance(f, dict):
base_name = mfl.package.name[0].lower()
# f.log("getting 4D masked arrays for {0}".format(base_name))
# m4d = mfl.masked_4D_arrays
# f.log("getting 4D masked arrays for {0}".format(base_name))
# for name, array in m4d.items():
for name, array in mfl.masked_4D_arrays_itr():
var_name = f"{base_name}_{name}"
if isinstance(f, dict):
f[var_name] = array
continue
f.log(f"processing {name} attribute")
units = None
if var_name in NC_UNITS_FORMAT:
units = NC_UNITS_FORMAT[var_name].format(
f.grid_units, f.time_units
)
precision_str = NC_PRECISION_TYPE[mfl.dtype[name].type]
if var_name in NC_LONG_NAMES:
attribs = {"long_name": NC_LONG_NAMES[var_name]}
else:
attribs = {"long_name": var_name}
attribs["coordinates"] = "time layer latitude longitude"
attribs["min"] = np.nanmin(array)
attribs["max"] = np.nanmax(array)
if np.isnan(attribs["min"]) or np.isnan(attribs["max"]):
raise Exception(f"error processing {var_name}: all NaNs")
if units is not None:
attribs["units"] = units
try:
dim_tuple = ("time",) + f.dimension_names
var = f.create_variable(
var_name,
attribs,
precision_str=precision_str,
dimensions=dim_tuple,
)
except Exception as e:
estr = f"error creating variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
array[np.isnan(array)] = f.fillvalue
try:
var[:] = array
except Exception as e:
estr = f"error setting array to variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
f.log(f"processing {name} attribute")
return f
else:
raise NotImplementedError(f"unrecognized export argument:{f}")
[docs]def transient2d_export(f: Union[str, os.PathLike], t2d, fmt=None, **kwargs):
"""
export helper for Transient2d instances
Parameters
-----------
f : str or PathLike
filename or existing export instance type (NetCdf only for now)
t2d : Transient2d instance
fmt : str
output format flag. 'vtk' will export to vtk
**kwargs : keyword arguments
min_valid : minimum valid value
max_valid : maximum valid value
modelgrid : flopy.discretization.Grid
model grid instance which will supersede the flopy.model.modelgrid
if fmt is set to 'vtk', parameters of Vtk initializer
"""
if not isinstance(t2d, DataInterface):
err = (
"transient2d_helper only helps instances that support "
"DataInterface"
)
raise AssertionError(err)
min_valid = kwargs.get("min_valid", -1.0e9)
max_valid = kwargs.get("max_valid", 1.0e9)
modelgrid = t2d.model.modelgrid
if "modelgrid" in kwargs:
modelgrid = kwargs.pop("modelgrid")
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".nc":
f = NetCdf(f, t2d.model, **kwargs)
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".shp":
array_dict = {}
for kper in range(t2d.model.modeltime.nper):
u2d = t2d[kper]
name = f"{shapefile_utils.shape_attr_name(u2d.name)}_{kper + 1}"
array_dict[name] = u2d.array
shapefile_utils.write_grid_shapefile(f, modelgrid, array_dict)
elif isinstance(f, NetCdf) or isinstance(f, dict):
# mask the array is defined by any row col with at lease
# one active cell
mask = None
if modelgrid.idomain is not None:
ibnd = np.abs(modelgrid.idomain).sum(axis=0)
mask = ibnd == 0
# f.log("getting 4D array for {0}".format(t2d.name_base))
array = t2d.array
# f.log("getting 4D array for {0}".format(t2d.name_base))
with np.errstate(invalid="ignore"):
if array.dtype not in [int, np.int32, np.int64]:
if mask is not None:
array[:, 0, mask] = np.nan
array[array <= min_valid] = np.nan
array[array >= max_valid] = np.nan
mx, mn = np.nanmax(array), np.nanmin(array)
else:
mx, mn = np.nanmax(array), np.nanmin(array)
array[array <= min_valid] = netcdf.FILLVALUE
array[array >= max_valid] = netcdf.FILLVALUE
# if t2d.model.bas6 is not None:
# array[:, 0, t2d.model.bas6.ibound.array[0] == 0] = \
# f.fillvalue
# elif t2d.model.btn is not None:
# array[:, 0, t2d.model.btn.icbund.array[0] == 0] = \
# f.fillvalue
var_name = t2d.name.replace("_", "")
if isinstance(f, dict):
array[array == netcdf.FILLVALUE] = np.nan
f[var_name] = array
return f
array[np.isnan(array)] = f.fillvalue
units = "unitless"
if var_name in NC_UNITS_FORMAT:
units = NC_UNITS_FORMAT[var_name].format(
f.grid_units, f.time_units
)
try:
precision_str = NC_PRECISION_TYPE[t2d.dtype]
except:
precision_str = NC_PRECISION_TYPE[t2d.dtype.type]
if var_name in NC_LONG_NAMES:
attribs = {"long_name": NC_LONG_NAMES[var_name]}
else:
attribs = {"long_name": var_name}
attribs["coordinates"] = "time layer latitude longitude"
attribs["units"] = units
attribs["min"] = mn
attribs["max"] = mx
if np.isnan(attribs["min"]) or np.isnan(attribs["max"]):
raise Exception(f"error processing {var_name}: all NaNs")
try:
dim_tuple = ("time",) + f.dimension_names
var = f.create_variable(
var_name,
attribs,
precision_str=precision_str,
dimensions=dim_tuple,
)
except Exception as e:
estr = f"error creating variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
try:
var[:, 0] = array
except Exception as e:
estr = f"error setting array to variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
return f
elif fmt == "vtk":
name = kwargs.get("name", t2d.name)
xml = kwargs.get("xml", False)
masked_values = kwargs.get("masked_values", None)
pvd = kwargs.get("pvd", False)
smooth = kwargs.get("smooth", False)
point_scalars = kwargs.get("point_scalars", False)
binary = kwargs.get("binary", True)
vertical_exageration = kwargs.get("vertical_exageration", 1)
kpers = kwargs.get("kpers", None)
if t2d.array is not None:
if hasattr(t2d, "transient_2ds"):
d = t2d.transient_2ds
else:
d = {ix: i for ix, i in enumerate(t2d.array)}
else:
raise AssertionError("No data available to export")
vtkobj = vtk.Vtk(
t2d.model,
vertical_exageration=vertical_exageration,
binary=binary,
xml=xml,
pvd=pvd,
smooth=smooth,
point_scalars=point_scalars,
)
vtkobj.add_transient_array(d, name=name, masked_values=masked_values)
vtkobj.write(os.path.join(f, name), kper=kpers)
else:
raise NotImplementedError(f"unrecognized export argument:{f}")
[docs]def array3d_export(f: Union[str, os.PathLike], u3d, fmt=None, **kwargs):
"""
export helper for Transient2d instances
Parameters
-----------
f : str or PathLike
filename or existing export instance type (NetCdf only for now)
u3d : Util3d instance
fmt : str
output format flag. 'vtk' will export to vtk
**kwargs : keyword arguments
min_valid : minimum valid value
max_valid : maximum valid value
modelgrid : flopy.discretization.Grid
model grid instance which will supersede the flopy.model.modelgrid
if fmt is set to 'vtk', parameters of Vtk initializer
"""
assert isinstance(
u3d, DataInterface
), "array3d_export only helps instances that support DataInterface"
min_valid = kwargs.get("min_valid", -1.0e9)
max_valid = kwargs.get("max_valid", 1.0e9)
modelgrid = u3d.model.modelgrid
if "modelgrid" in kwargs:
modelgrid = kwargs.pop("modelgrid")
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".nc":
f = NetCdf(f, u3d.model, **kwargs)
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".shp":
array_dict = {}
for ilay in range(modelgrid.nlay):
u2d = u3d[ilay]
if isinstance(u2d, np.ndarray):
dname = u3d.name
array = u2d
else:
dname = u2d.name
array = u2d.array
name = f"{shapefile_utils.shape_attr_name(dname)}_{ilay + 1}"
array_dict[name] = array
shapefile_utils.write_grid_shapefile(f, modelgrid, array_dict)
elif isinstance(f, NetCdf) or isinstance(f, dict):
var_name = u3d.name
if isinstance(var_name, list) or isinstance(var_name, tuple):
var_name = var_name[0]
var_name = var_name.replace(" ", "_").lower()
# f.log("getting 3D array for {0}".format(var_name))
array = u3d.array
# this is for the crappy vcont in bcf6
# if isinstance(f,NetCdf) and array.shape != f.shape:
# f.log("broadcasting 3D array for {0}".format(var_name))
# full_array = np.empty(f.shape)
# full_array[:] = np.nan
# full_array[:array.shape[0]] = array
# array = full_array
# f.log("broadcasting 3D array for {0}".format(var_name))
# f.log("getting 3D array for {0}".format(var_name))
#
mask = None
if modelgrid.idomain is not None and "ibound" not in var_name:
mask = modelgrid.idomain == 0
if mask is not None and array.shape != mask.shape:
# f.log("broadcasting 3D array for {0}".format(var_name))
full_array = np.empty(mask.shape)
full_array[:] = np.nan
full_array[: array.shape[0]] = array
array = full_array
# f.log("broadcasting 3D array for {0}".format(var_name))
# runtime warning issued in some cases - need to track down cause
# happens when NaN is already in array
with np.errstate(invalid="ignore"):
if array.dtype not in [int, np.int32, np.int64]:
# if u3d.model.modelgrid.bas6 is not None and "ibound" not
# in var_name:
# array[u3d.model.modelgrid.bas6.ibound.array == 0] =
# np.nan
# elif u3d.model.btn is not None and 'icbund' not in var_name:
# array[u3d.model.modelgrid.btn.icbund.array == 0] = np.nan
if mask is not None:
array[mask] = np.nan
array[array <= min_valid] = np.nan
array[array >= max_valid] = np.nan
mx, mn = np.nanmax(array), np.nanmin(array)
else:
mx, mn = np.nanmax(array), np.nanmin(array)
if mask is not None:
array[mask] = netcdf.FILLVALUE
array[array <= min_valid] = netcdf.FILLVALUE
array[array >= max_valid] = netcdf.FILLVALUE
if modelgrid.idomain is not None and "ibound" not in var_name:
array[modelgrid.idomain == 0] = netcdf.FILLVALUE
if isinstance(f, dict):
f[var_name] = array
return f
array[np.isnan(array)] = f.fillvalue
units = "unitless"
if var_name in NC_UNITS_FORMAT:
units = NC_UNITS_FORMAT[var_name].format(
f.grid_units, f.time_units
)
precision_str = NC_PRECISION_TYPE[u3d.dtype]
if var_name in NC_LONG_NAMES:
attribs = {"long_name": NC_LONG_NAMES[var_name]}
else:
attribs = {"long_name": var_name}
attribs["coordinates"] = "layer latitude longitude"
attribs["units"] = units
attribs["min"] = mn
attribs["max"] = mx
if np.isnan(attribs["min"]) or np.isnan(attribs["max"]):
raise Exception(f"error processing {var_name}: all NaNs")
try:
var = f.create_variable(
var_name,
attribs,
precision_str=precision_str,
dimensions=f.dimension_names,
)
except Exception as e:
estr = f"error creating variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
try:
var[:] = array
except Exception as e:
estr = f"error setting array to variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
return f
elif fmt == "vtk":
# call vtk array export to folder
name = kwargs.get("name", u3d.name)
xml = kwargs.get("xml", False)
masked_values = kwargs.get("masked_values", None)
smooth = kwargs.get("smooth", False)
point_scalars = kwargs.get("point_scalars", False)
binary = kwargs.get("binary", True)
vertical_exageration = kwargs.get("vertical_exageration", 1)
if isinstance(name, list) or isinstance(name, tuple):
name = name[0]
vtkobj = vtk.Vtk(
u3d.model,
smooth=smooth,
point_scalars=point_scalars,
binary=binary,
xml=xml,
vertical_exageration=vertical_exageration,
)
vtkobj.add_array(u3d.array, name, masked_values=masked_values)
vtkobj.write(os.path.join(f, name))
else:
raise NotImplementedError(f"unrecognized export argument:{f}")
[docs]def array2d_export(
f: Union[str, os.PathLike], u2d, fmt=None, verbose=False, **kwargs
):
"""
export helper for Util2d instances
Parameters
----------
f : str or PathLike
filename or existing export instance type (NetCdf only for now)
u2d : Util2d instance
fmt : str
output format flag. 'vtk' will export to vtk
verbose : bool
whether to print verbose output
**kwargs : keyword arguments
min_valid : minimum valid value
max_valid : maximum valid value
modelgrid : flopy.discretization.Grid
model grid instance which will supersede the flopy.model.modelgrid
if fmt is set to 'vtk', parameters of Vtk initializer
"""
assert isinstance(
u2d, DataInterface
), "util2d_helper only helps instances that support DataInterface"
assert len(u2d.array.shape) == 2, "util2d_helper only supports 2D arrays"
min_valid = kwargs.get("min_valid", -1.0e9)
max_valid = kwargs.get("max_valid", 1.0e9)
modelgrid = u2d.model.modelgrid
if "modelgrid" in kwargs:
modelgrid = kwargs.pop("modelgrid")
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".nc":
f = NetCdf(f, u2d.model, **kwargs)
if (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".shp":
name = shapefile_utils.shape_attr_name(u2d.name, keep_layer=True)
shapefile_utils.write_grid_shapefile(
f, modelgrid, {name: u2d.array}, verbose=verbose
)
return
elif (isinstance(f, str) or isinstance(f, Path)) and Path(
f
).suffix.lower() == ".asc":
export_array(modelgrid, f, u2d.array, **kwargs)
return
elif isinstance(f, NetCdf) or isinstance(f, dict):
# try to mask the array - assume layer 1 ibound is a good mask
# f.log("getting 2D array for {0}".format(u2d.name))
array = u2d.array
# f.log("getting 2D array for {0}".format(u2d.name))
with np.errstate(invalid="ignore"):
if array.dtype not in [int, np.int32, np.int64]:
if (
modelgrid.idomain is not None
and "ibound" not in u2d.name.lower()
and "idomain" not in u2d.name.lower()
):
array[modelgrid.idomain[0, :, :] == 0] = np.nan
array[array <= min_valid] = np.nan
array[array >= max_valid] = np.nan
mx, mn = np.nanmax(array), np.nanmin(array)
else:
mx, mn = np.nanmax(array), np.nanmin(array)
array[array <= min_valid] = netcdf.FILLVALUE
array[array >= max_valid] = netcdf.FILLVALUE
if (
modelgrid.idomain is not None
and "ibound" not in u2d.name.lower()
and "idomain" not in u2d.name.lower()
and "icbund" not in u2d.name.lower()
):
array[modelgrid.idomain[0, :, :] == 0] = netcdf.FILLVALUE
var_name = u2d.name
if isinstance(f, dict):
f[var_name] = array
return f
array[np.isnan(array)] = f.fillvalue
units = "unitless"
if var_name in NC_UNITS_FORMAT:
units = NC_UNITS_FORMAT[var_name].format(
f.grid_units, f.time_units
)
precision_str = NC_PRECISION_TYPE[u2d.dtype]
if var_name in NC_LONG_NAMES:
attribs = {"long_name": NC_LONG_NAMES[var_name]}
else:
attribs = {"long_name": var_name}
attribs["coordinates"] = "latitude longitude"
attribs["units"] = units
attribs["min"] = mn
attribs["max"] = mx
if np.isnan(attribs["min"]) or np.isnan(attribs["max"]):
raise Exception(f"error processing {var_name}: all NaNs")
try:
var = f.create_variable(
var_name,
attribs,
precision_str=precision_str,
dimensions=f.dimension_names[1:],
)
except Exception as e:
estr = f"error creating variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
try:
var[:] = array
except Exception as e:
estr = f"error setting array to variable {var_name}:\n{e!s}"
f.logger.warn(estr)
raise Exception(estr)
return f
elif fmt == "vtk":
name = kwargs.get("name", u2d.name)
xml = kwargs.get("xml", False)
masked_values = kwargs.get("masked_values", None)
smooth = kwargs.get("smooth", False)
point_scalars = kwargs.get("point_scalars", False)
binary = kwargs.get("binary", True)
vertical_exageration = kwargs.get("vertical_exageration", 1)
if isinstance(name, list) or isinstance(name, tuple):
name = name[0]
vtkobj = vtk.Vtk(
u2d.model,
smooth=smooth,
point_scalars=point_scalars,
binary=binary,
xml=xml,
vertical_exageration=vertical_exageration,
)
array = np.ones((modelgrid.nnodes,)) * np.nan
array[0 : u2d.array.size] = np.ravel(u2d.array)
vtkobj.add_array(array, name, masked_values=masked_values)
vtkobj.write(os.path.join(f, name))
else:
raise NotImplementedError(f"unrecognized export argument:{f}")
[docs]def export_array(
modelgrid,
filename: Union[str, os.PathLike],
a,
nodata=-9999,
fieldname="value",
verbose=False,
**kwargs,
):
"""
Write a numpy array to Arc Ascii grid or shapefile with the model
reference.
Parameters
----------
modelgrid : flopy.discretization.StructuredGrid object
model grid
filename : str or PathLike
Path of output file. Export format is determined by
file extension.
'.asc' Arc Ascii grid
'.tif' GeoTIFF (requires rasterio package)
'.shp' Shapefile
a : 2D numpy.ndarray
Array to export
nodata : scalar
Value to assign to np.nan entries (default -9999)
fieldname : str
Attribute field name for array values (shapefile export only).
(default 'values')
verbose : bool, optional, default False
whether to show verbose output
kwargs:
keyword arguments to np.savetxt (ascii)
rasterio.open (GeoTIFF)
or flopy.export.shapefile_utils.write_grid_shapefile
Notes
-----
Rotated grids will be either be unrotated prior to export,
using scipy.ndimage.rotate (Arc Ascii format) or rotation will be
included in their transform property (GeoTiff format). In either case
the pixels will be displayed in the (unrotated) projected geographic
coordinate system, so the pixels will no longer align exactly with the
model grid (as displayed from a shapefile, for example). A key difference
between Arc Ascii and GeoTiff (besides disk usage) is that the
unrotated Arc Ascii will have a different grid size, whereas the GeoTiff
will have the same number of rows and pixels as the original.
"""
filename = str(filename)
if filename.lower().endswith(".asc"):
if (
len(np.unique(modelgrid.delr))
!= len(np.unique(modelgrid.delc))
!= 1
or modelgrid.delr[0] != modelgrid.delc[0]
):
raise ValueError("Arc ascii arrays require a uniform grid.")
xoffset, yoffset = modelgrid.xoffset, modelgrid.yoffset
cellsize = modelgrid.delr[0]
fmt = kwargs.get("fmt", "%.18e")
a = a.copy()
a[np.isnan(a)] = nodata
if modelgrid.angrot != 0:
ndimage = import_optional_dependency(
"scipy.ndimage",
error_message="exporting rotated grids requires SciPy.",
)
a = ndimage.rotate(a, modelgrid.angrot, cval=nodata)
height_rot, width_rot = a.shape
xmin, xmax, ymin, ymax = modelgrid.extent
dx = (xmax - xmin) / width_rot
dy = (ymax - ymin) / height_rot
cellsize = np.max((dx, dy))
xoffset, yoffset = xmin, ymin
filename = (
".".join(filename.split(".")[:-1]) + ".asc"
) # enforce .asc ending
nrow, ncol = a.shape
a[np.isnan(a)] = nodata
txt = f"ncols {ncol}\n"
txt += f"nrows {nrow}\n"
txt += f"xllcorner {xoffset:f}\n"
txt += f"yllcorner {yoffset:f}\n"
txt += f"cellsize {cellsize}\n"
# ensure that nodata fmt consistent w values
txt += f"NODATA_value {fmt}\n" % (nodata)
with open(filename, "w") as output:
output.write(txt)
with open(filename, "ab") as output:
np.savetxt(output, a, **kwargs)
if verbose:
print(f"wrote {flopy_io.relpath_safe(filename)}")
elif filename.lower().endswith(".tif"):
if (
len(np.unique(modelgrid.delr))
!= len(np.unique(modelgrid.delc))
!= 1
or modelgrid.delr[0] != modelgrid.delc[0]
):
raise ValueError("GeoTIFF export require a uniform grid.")
rasterio = import_optional_dependency(
"rasterio",
error_message="GeoTIFF export requires the rasterio.",
)
dxdy = modelgrid.delc[0]
# because this is only implemented for a structured grid,
# we can get the xul and yul coordinate from modelgrid.xvertices(0, 0)
verts = modelgrid.get_cell_vertices(0, 0)
xul, yul = verts[0]
trans = (
rasterio.Affine.translation(xul, yul)
* rasterio.Affine.rotation(modelgrid.angrot)
* rasterio.Affine.scale(dxdy, -dxdy)
)
# third dimension is the number of bands
a = a.copy()
if len(a.shape) == 2:
a = np.reshape(a, (1, a.shape[0], a.shape[1]))
if a.dtype.name == "int64":
a = a.astype("int32")
dtype = rasterio.int32
elif a.dtype.name == "int32":
dtype = rasterio.int32
elif a.dtype.name == "float64":
dtype = rasterio.float64
elif a.dtype.name == "float32":
dtype = rasterio.float32
else:
msg = f'ERROR: invalid dtype "{a.dtype.name}"'
raise TypeError(msg)
meta = {
"count": a.shape[0],
"width": a.shape[2],
"height": a.shape[1],
"nodata": nodata,
"dtype": dtype,
"driver": "GTiff",
"crs": modelgrid.proj4,
"transform": trans,
}
meta.update(kwargs)
with rasterio.open(filename, "w", **meta) as dst:
dst.write(a)
if verbose:
print(f"wrote {flopy_io.relpath_safe(filename)}")
elif filename.lower().endswith(".shp"):
from ..export.shapefile_utils import write_grid_shapefile
try:
crs = get_crs(**kwargs)
except ImportError:
crs = None
write_grid_shapefile(
filename,
modelgrid,
array_dict={fieldname: a},
nan_val=nodata,
crs=crs,
)
[docs]def export_contours(
filename: Union[str, os.PathLike],
contours,
fieldname="level",
verbose=False,
**kwargs,
):
"""
Convert matplotlib contour plot object to shapefile.
Parameters
----------
filename : str or PathLike
path of output shapefile
contours : matplotlib.contour.QuadContourSet or list of them
(object returned by matplotlib.pyplot.contour)
fieldname : str
gis attribute table field name
verbose : bool, optional, default False
whether to show verbose output
**kwargs : key-word arguments to flopy.export.shapefile_utils.recarray2shp
Returns
-------
df : dataframe of shapefile contents
"""
from importlib.metadata import version
from matplotlib.path import Path
from ..utils.geometry import LineString
from .shapefile_utils import recarray2shp
if not isinstance(contours, list):
contours = [contours]
# Export a linestring for each contour component.
# Levels may have multiple disconnected components.
geoms = []
level = []
# ContourSet.collections was deprecated with
# matplotlib 3.8. ContourSet is a collection
# of Paths, where each Path corresponds to a
# contour level, and may contain one or more
# (possibly disconnected) components. Before
# 3.8, iterating over ContourSet.collections
# and enumerating from get_paths() suffices,
# but post-3.8, we have to walk the segments
# to distinguish disconnected components.
mpl_ver = Version(version("matplotlib"))
for ctr in contours:
if mpl_ver < Version("3.8.0"):
levels = ctr.levels
for i, c in enumerate(ctr.collections):
paths = c.get_paths()
geoms += [LineString(p.vertices) for p in paths]
level += list(np.ones(len(paths)) * levels[i])
else:
paths = ctr.get_paths()
for pi, path in enumerate(paths):
# skip empty paths
if path.vertices.shape[0] == 0:
continue
# Each Path may contain multiple components
# so we unpack them as separate geometries.
lines = []
segs = []
for seg in path.iter_segments():
pts, code = seg
if code == Path.MOVETO:
if len(segs) > 0:
lines.append(LineString(segs))
segs = []
segs.append(pts)
elif code == Path.LINETO:
segs.append(pts)
elif code == Path.CLOSEPOLY:
segs.append(pts)
segs.append(segs[0]) # add closing segment
lines.append(LineString(segs))
segs = []
if len(segs) > 0:
lines.append(LineString(segs))
level += list(np.ones(len(lines)) * ctr.levels[pi])
geoms += lines
if verbose:
print(f"Writing {len(level)} contour lines")
ra = np.array(level, dtype=[(fieldname, float)]).view(np.recarray)
recarray2shp(ra, geoms, filename, **kwargs)
[docs]def export_contourf(
filename, contours, fieldname="level", verbose=False, **kwargs
):
"""
Write matplotlib filled contours to shapefile.
Parameters
----------
filename : str or PathLike
name of output shapefile (e.g. myshp.shp)
contours : matplotlib.contour.QuadContourSet or list of them
(object returned by matplotlib.pyplot.contourf)
fieldname : str
Name of shapefile attribute field to contain the contour level. The
fieldname column in the attribute table will contain the lower end of
the range represented by the polygon. Default is 'level'.
verbose : bool, optional, default False
whether to show verbose output
**kwargs : keyword arguments to flopy.export.shapefile_utils.recarray2shp
Returns
-------
None
Examples
--------
>>> import flopy
>>> import matplotlib.pyplot as plt
>>> from flopy.export.utils import export_contourf
>>> a = np.random.random((10, 10))
>>> cs = plt.contourf(a)
>>> export_contourf('myfilledcontours.shp', cs)
"""
from importlib.metadata import version
from matplotlib.path import Path
from ..utils.geometry import Polygon, is_clockwise
from .shapefile_utils import recarray2shp
if not isinstance(contours, list):
contours = [contours]
# export a polygon for each filled contour
geoms = []
level = []
# ContourSet.collections was deprecated with
# matplotlib 3.8. ContourSet is a collection
# of Paths, where each Path corresponds to a
# contour level, and may contain one or more
# (possibly disconnected) components. Before
# 3.8, iterating over ContourSet.collections
# and enumerating from get_paths() suffices,
# but post-3.8, we have to walk the segments
# to distinguish disconnected components.
mpl_ver = Version(version("matplotlib"))
for ctr in contours:
if mpl_ver < Version("3.8.0"):
levels = ctr.levels
for idx, col in enumerate(ctr.collections):
for contour_path in col.get_paths():
# Create the polygon(s) for this intensity level
poly = None
for ncp, cp in enumerate(contour_path.to_polygons()):
x = cp[:, 0]
y = cp[:, 1]
verts = [(i[0], i[1]) for i in zip(x, y)]
new_shape = Polygon(verts)
if ncp == 0:
poly = new_shape
else:
# check if this is a multipolygon by checking vertex
# order.
if is_clockwise(verts):
# Clockwise is a hole, set to interiors
if not poly.interiors:
poly.interiors = [new_shape.exterior]
else:
poly.interiors.append(new_shape.exterior)
else:
geoms.append(poly)
level.append(levels[idx])
poly = new_shape
if poly is not None:
# store geometry object
geoms.append(poly)
level.append(levels[idx])
else:
paths = ctr.get_paths()
for pi, path in enumerate(paths):
# skip empty paths
if path.vertices.shape[0] == 0:
continue
polys = []
segs = []
for seg in path.iter_segments():
pts, code = seg
if code == Path.MOVETO:
if len(segs) > 0:
polys.append(Polygon(segs))
segs = []
segs.append(pts)
elif code == Path.LINETO:
segs.append(pts)
elif code == Path.CLOSEPOLY:
segs.append(pts)
segs.append(segs[0]) # add closing segment
polys.append(Polygon(segs))
segs = []
if len(segs) > 0:
polys.append(Polygon(segs))
geoms.extend(polys)
level.extend(repeat(ctr.levels[pi], len(polys)))
if verbose:
print(f"Writing {len(level)} polygons")
ra = np.array(level, dtype=[(fieldname, float)]).view(np.recarray)
recarray2shp(ra, geoms, filename, **kwargs)
[docs]def export_array_contours(
modelgrid,
filename: Union[str, os.PathLike],
a,
fieldname="level",
interval=None,
levels=None,
maxlevels=1000,
**kwargs,
):
"""
Contour an array using matplotlib; write shapefile of contours.
Parameters
----------
modelgrid : flopy.discretization.Grid object
model grid object
filename : str or PathLike
Path of output file with '.shp' extension.
a : 2D numpy array
Array to contour
fieldname : str
gis field name
interval : float
interval to calculate levels from
levels : list
list of contour levels
maxlevels : int
maximum number of contour levels
**kwargs : keyword arguments to flopy.export.shapefile_utils.recarray2shp
"""
import matplotlib.pyplot as plt
if interval is not None:
imin = np.nanmin(a)
imax = np.nanmax(a)
nlevels = np.round(np.abs(imax - imin) / interval, 2)
msg = f"{nlevels:.0f} levels at interval of {interval} > maxlevels={maxlevels}"
assert nlevels < maxlevels, msg
levels = np.arange(imin, imax, interval)
ax = plt.subplots()[-1]
layer = kwargs.pop("layer", 0)
ctr = contour_array(modelgrid, ax, a, layer, levels=levels)
kwargs["mg"] = modelgrid
export_contours(filename, ctr, fieldname, **kwargs)
plt.close()
[docs]def contour_array(modelgrid, ax, a, layer=0, **kwargs):
"""
Create a QuadMesh plot of the specified array using pcolormesh
Parameters
----------
modelgrid : flopy.discretization.Grid object
modelgrid object
ax : matplotlib.axes.Axes
ax to add the contours
a : np.ndarray
array to contour
layer : int, optional
layer to contour
Returns
-------
contour_set : ContourSet
"""
from ..plot import PlotMapView
kwargs["ax"] = ax
pmv = PlotMapView(modelgrid=modelgrid, layer=layer)
contour_set = pmv.contour_array(a=a, **kwargs)
return contour_set