Source code for flopy.export.netcdf

import copy
import json
import os
import platform
import socket
import time
from datetime import datetime
from os import PathLike
from pathlib import Path
from typing import Optional, Union

import numpy as np
from packaging import version

from ..utils import import_optional_dependency
from ..utils.crs import get_authority_crs
from .longnames import NC_LONG_NAMES
from .metadata import acdd

# globals
FILLVALUE = -99999.9
ITMUNI = {
    0: "undefined",
    1: "seconds",
    2: "minutes",
    3: "hours",
    4: "days",
    5: "years",
}
PRECISION_STRS = ["f4", "f8", "i4"]

STANDARD_VARS = ["longitude", "latitude", "layer", "elevation", "time"]


[docs]class Logger: """ Basic class for logging events during the linear analysis calculations if filename is passed, then an file handle is opened Parameters ---------- filename : bool or string if string, it is the log file to write. If a bool, then log is written to the screen. echo (bool): a flag to force screen output Attributes ---------- items : dict tracks when something is started. If a log entry is not in items, then it is treated as a new entry with the string being the key and the datetime as the value. If a log entry is in items, then the end time and delta time are written and the item is popped from the keys """ def __init__(self, filename, echo=False): self.items = {} self.echo = bool(echo) if filename is True: self.echo = True self.filename = None elif filename: self.f = open(filename, "w", 0) # unbuffered self.t = datetime.now() self.log(f"opening {filename} for logging") else: self.filename = None
[docs] def log(self, phrase): """ log something that happened Parameters ---------- phrase : str the thing that happened """ t = datetime.now() if phrase in self.items.keys(): t0 = self.items.pop(phrase) s = f"{t} finished: {phrase}, took: {t - t0}\n" if self.echo: print(s, end="") if self.filename: self.f.write(s) else: s = f"{t} starting: {phrase}\n" if self.echo: print(s, end="") if self.filename: self.f.write(s) self.items[phrase] = copy.deepcopy(t)
[docs] def warn(self, message): """ Write a warning to the log file Parameters ---------- message : str the warning text """ s = f"{datetime.now()} WARNING: {message}\n" if self.echo: print(s, end="") if self.filename: self.f.write(s) return
[docs]class NetCdf: """ Support for writing a netCDF4 compliant file from a flopy model Parameters ---------- output_filename : str or PathLike Path of the .nc file to write model : flopy model instance time_values : the entries for the time dimension if not None, the constructor will initialize the file. If None, the perlen array of ModflowDis will be used z_positive : str ('up' or 'down') Positive direction of vertical coordinates written to NetCDF file. (default 'down') verbose : if True, stdout is verbose. If str, then a log file is written to the verbose file prj : str, optional, default None PROJ4 string logger : Logger, optional, default None Logging object for custom logging configuration forgive : what to do if a duplicate variable name is being created. If True, then the newly requested var is skipped. If False, then an exception is raised. **kwargs : keyword arguments modelgrid : flopy.discretization.Grid instance user supplied model grid which will be used in lieu of the model object modelgrid for netcdf production Notes ----- This class relies heavily on the grid and modeltime objects, including these attributes: lenuni, itmuni, start_datetime, and proj4. Make sure these attributes have meaningful values. """ def __init__( self, output_filename: Union[str, PathLike], model, time_values=None, z_positive="up", verbose=None, prj=None, logger=None, forgive=False, **kwargs, ): output_filename = Path(output_filename) assert output_filename.suffix == ".nc" if verbose is None: verbose = model.verbose if logger is not None: self.logger = logger else: self.logger = Logger(verbose) self.var_attr_dict = {} self.log = self.logger.log if output_filename.exists(): self.logger.warn(f"removing existing nc file: {output_filename}") output_filename.unlink() self.output_filename = output_filename self.forgive = bool(forgive) self.model = model self.model_grid = model.modelgrid if "modelgrid" in kwargs: self.model_grid = kwargs.pop("modelgrid") self.modeltime = model.modeltime if prj is not None: self.model_grid.proj4 = prj if self.model_grid.grid_type == "structured": self.dimension_names = ("layer", "y", "x") STANDARD_VARS.extend(["delc", "delr"]) else: raise Exception(f"Grid type {self.model_grid.grid_type} not supported.") self.shape = self.model_grid.shape dt = self.modeltime.start_datetime self.start_datetime = self.modeltime.get_datetime_string(dt) self.logger.log(f"start datetime:{self.start_datetime}") crs = get_authority_crs(self.model_grid.crs) if crs is None: self.logger.warn("model has no coordinate reference system specified. ") self.model_crs = crs self.transformer = None self.grid_units = self.model_grid.units self.z_positive = z_positive if self.grid_units is None: self.grid_units = "undefined" assert self.grid_units in {"feet", "meters", "undefined"}, ( "unsupported length units: " + self.grid_units ) self.time_units = self.modeltime.time_units self.log("initializing attributes") self.nc_crs_str = "epsg:4326" self.nc_crs_longname = "https://www.opengis.net/def/crs/EPSG/0/4326" self.nc_semi_major = 6378137.0 self.nc_inverse_flat = 298.257223563 self.global_attributes = {} self.global_attributes["namefile"] = self.model.namefile self.global_attributes["model_ws"] = self.model.model_ws self.global_attributes["exe_name"] = self.model.exe_name self.global_attributes["modflow_version"] = self.model.version self.global_attributes["create_hostname"] = socket.gethostname() self.global_attributes["create_platform"] = platform.system() self.global_attributes["create_directory"] = os.getcwd() htol, rtol = -999, -999 try: htol, rtol = self.model.solver_tols() except Exception as e: self.logger.warn(f"unable to get solver tolerances:{e!s}") self.global_attributes["solver_head_tolerance"] = htol self.global_attributes["solver_flux_tolerance"] = rtol spatial_attribs = { "xll": self.model_grid.xoffset, "yll": self.model_grid.yoffset, "rotation": self.model_grid.angrot, "crs": self.model_grid.crs, } for n, v in spatial_attribs.items(): self.global_attributes["flopy_sr_" + n] = v self.global_attributes["start_datetime"] = self.start_datetime self.fillvalue = FILLVALUE # initialize attributes self.grid_crs = None self.zs = self.model_grid.xyzcellcenters[2].copy() self.ys = None self.xs = None self.nc = None self.log("initializing attributes") self.time_values_arg = time_values self.log("initializing file") self.initialize_file(time_values=self.time_values_arg) self.log("initializing file") def __enter__(self): """Enter context with statement, returning with an open dataset.""" return self def __exit__(self, *exc): """Exit context with statement, write and close dataset.""" self.write() def __add__(self, other): new_net = NetCdf.zeros_like(self) if np.isscalar(other) or isinstance(other, np.ndarray): for vname in self.var_attr_dict.keys(): new_net.nc.variables[vname][:] = self.nc.variables[vname][:] + other elif isinstance(other, NetCdf): for vname in self.var_attr_dict.keys(): new_net.nc.variables[vname][:] = ( self.nc.variables[vname][:] + other.nc.variables[vname][:] ) else: raise Exception(f"NetCdf.__add__(): unrecognized other:{type(other)}") new_net.nc.sync() return new_net def __sub__(self, other): new_net = NetCdf.zeros_like(self) if np.isscalar(other) or isinstance(other, np.ndarray): for vname in self.var_attr_dict.keys(): new_net.nc.variables[vname][:] = self.nc.variables[vname][:] - other elif isinstance(other, NetCdf): for vname in self.var_attr_dict.keys(): new_net.nc.variables[vname][:] = ( self.nc.variables[vname][:] - other.nc.variables[vname][:] ) else: raise Exception(f"NetCdf.__sub__(): unrecognized other:{type(other)}") new_net.nc.sync() return new_net def __mul__(self, other): new_net = NetCdf.zeros_like(self) if np.isscalar(other) or isinstance(other, np.ndarray): for vname in self.var_attr_dict.keys(): new_net.nc.variables[vname][:] = self.nc.variables[vname][:] * other elif isinstance(other, NetCdf): for vname in self.var_attr_dict.keys(): new_net.nc.variables[vname][:] = ( self.nc.variables[vname][:] * other.nc.variables[vname][:] ) else: raise Exception(f"NetCdf.__mul__(): unrecognized other:{type(other)}") new_net.nc.sync() return new_net def __truediv__(self, other): new_net = NetCdf.zeros_like(self) with np.errstate(invalid="ignore"): if np.isscalar(other) or isinstance(other, np.ndarray): for vname in self.var_attr_dict.keys(): new_net.nc.variables[vname][:] = self.nc.variables[vname][:] / other elif isinstance(other, NetCdf): for vname in self.var_attr_dict.keys(): new_net.nc.variables[vname][:] = ( self.nc.variables[vname][:] / other.nc.variables[vname][:] ) else: raise Exception(f"NetCdf.__sub__(): unrecognized other:{type(other)}") new_net.nc.sync() return new_net
[docs] def append(self, other, suffix="_1"): assert isinstance(other, NetCdf) or isinstance(other, dict) if isinstance(other, NetCdf): for vname in other.var_attr_dict.keys(): attrs = other.var_attr_dict[vname].copy() var = other.nc.variables[vname] new_vname = vname if vname in self.nc.variables.keys(): if vname not in STANDARD_VARS: new_vname = vname + suffix if "long_name" in attrs: attrs["long_name"] += " " + suffix else: continue assert new_vname not in self.nc.variables.keys(), ( "var already exists:{} in {}".format( new_vname, ",".join(self.nc.variables.keys()) ) ) attrs["max"] = var[:].max() attrs["min"] = var[:].min() new_var = self.create_variable( new_vname, attrs, var.dtype, dimensions=var.dimensions ) new_var[:] = var[:] else: for vname, array in other.items(): vname_norm = self.normalize_name(vname) assert vname_norm in self.nc.variables.keys(), ( f"dict var not in self.vars:{vname}-->" + ",".join(self.nc.variables.keys()) ) new_vname = vname_norm + suffix assert new_vname not in self.nc.variables.keys() attrs = self.var_attr_dict[vname_norm].copy() attrs["max"] = np.nanmax(array) attrs["min"] = np.nanmin(array) attrs["name"] = new_vname attrs["long_name"] = attrs["long_name"] + " " + suffix var = self.nc.variables[vname_norm] new_var = self.create_variable( new_vname, attrs, var.dtype, dimensions=var.dimensions ) try: new_var[:] = array except: new_var[:, 0] = array self.nc.sync() return
[docs] def copy(self, output_filename): new_net = NetCdf.zeros_like(self, output_filename=output_filename) for vname in self.var_attr_dict.keys(): new_net.nc.variables[vname][:] = self.nc.variables[vname][:] new_net.nc.sync() return new_net
@property def nc_crs(self): return get_authority_crs(self.nc_crs_str)
[docs] @classmethod def zeros_like(cls, other, output_filename=None, verbose=None, logger=None): new_net = NetCdf.empty_like( other, output_filename=output_filename, verbose=verbose, logger=logger ) # add the vars to the instance for vname in other.var_attr_dict.keys(): if new_net.nc.variables.get(vname) is not None: new_net.logger.warn(f"variable {vname} already defined, skipping") continue new_net.log(f"adding variable {vname}") var = other.nc.variables[vname] data = var[:] try: mask = data.mask data = np.array(data) except: mask = None new_data = np.zeros_like(data) new_data[mask] = FILLVALUE new_var = new_net.create_variable( vname, other.var_attr_dict[vname], var.dtype, dimensions=var.dimensions ) new_var[:] = new_data new_net.log(f"adding variable {vname}") global_attrs = {} for attr in other.nc.ncattrs(): if attr not in new_net.nc.ncattrs(): global_attrs[attr] = other.nc[attr] new_net.add_global_attributes(global_attrs) new_net.nc.sync() return new_net
[docs] @classmethod def empty_like(cls, other, output_filename=None, verbose=None, logger=None): if output_filename is None: output_filename = str(time.mktime(datetime.now().timetuple())) + ".nc" while os.path.exists(output_filename): print(f"{output_filename}...already exists") output_filename = str(time.mktime(datetime.now().timetuple())) + ".nc" print("creating temporary netcdf file..." + output_filename) new_net = cls( output_filename, other.model, time_values=other.time_values_arg, verbose=verbose, logger=logger, ) return new_net
[docs] def difference(self, other, minuend="self", mask_zero_diff=True, onlydiff=True): """ make a new NetCDF instance that is the difference with another netcdf file Parameters ---------- other : either an str filename of a netcdf file or a netCDF4 instance minuend : (optional) the order of the difference operation. Default is self (e.g. self - other). Can be "self" or "other" mask_zero_diff : bool flag to mask differences that are zero. If True, positions in the difference array that are zero will be set to self.fillvalue only_diff : bool flag to only add non-zero diffs to output file Returns ------- net NetCDF instance Notes ----- assumes the current NetCDF instance has been populated. The variable names and dimensions between the two files must match exactly. The name of the new .nc file is <self.output_filename>.diff.nc. The masks from both self and other are carried through to the new instance """ assert self.nc is not None, ( "can't call difference() if nc hasn't been populated" ) netCDF4 = import_optional_dependency("netCFD4") if isinstance(other, str): assert os.path.exists(other), f"filename 'other' not found:{other}" other = netCDF4.Dataset(other, "r") assert isinstance(other, netCDF4.Dataset) # check for similar variables self_vars = set(self.nc.variables.keys()) other_vars = set(other.variables) diff = self_vars.symmetric_difference(other_vars) if len(diff) > 0: self.logger.warn( "variables are not the same between the two nc files: " + ",".join(diff) ) return # check for similar dimensions self_dimens = self.nc.dimensions other_dimens = other.dimensions for d in self_dimens.keys(): if d not in other_dimens: self.logger.warn(f"missing dimension in other:{d}") return if len(self_dimens[d]) != len(other_dimens[d]): self.logger.warn( f"dimension not consistent: {self_dimens[d]}:{other_dimens[d]}" ) return # should be good to go time_values = self.nc.variables.get("time")[:] new_net = NetCdf( self.output_filename.replace(".nc", ".diff.nc"), self.model, time_values=time_values, ) # add the vars to the instance for vname in self_vars: if ( vname not in self.var_attr_dict or new_net.nc.variables.get(vname) is not None ): self.logger.warn(f"skipping variable: {vname}") continue self.log(f"processing variable {vname}") s_var = self.nc.variables[vname] o_var = other.variables[vname] s_data = s_var[:] o_data = o_var[:] o_mask, s_mask = None, None # keep the masks to apply later if isinstance(s_data, np.ma.MaskedArray): self.logger.warn(f"masked array for {vname}") s_mask = s_data.mask s_data = np.array(s_data) s_data[s_mask] = 0.0 else: np.nan_to_num(s_data) if isinstance(o_data, np.ma.MaskedArray): o_mask = o_data.mask o_data = np.array(o_data) o_data[o_mask] = 0.0 else: np.nan_to_num(o_data) # difference with self if minuend.lower() == "self": d_data = s_data - o_data elif minuend.lower() == "other": d_data = o_data - s_data else: mess = f"unrecognized minuend {minuend}" self.logger.warn(mess) raise Exception(mess) # check for non-zero diffs if onlydiff and d_data.sum() == 0.0: self.logger.warn(f"var {vname} has zero differences, skipping...") continue self.logger.warn( f"resetting diff attrs max,min:{d_data.min()},{d_data.max()}" ) attrs = self.var_attr_dict[vname].copy() attrs["max"] = np.nanmax(d_data) attrs["min"] = np.nanmin(d_data) # reapply masks if s_mask is not None: self.log("applying self mask") s_mask[d_data != 0.0] = False d_data[s_mask] = FILLVALUE self.log("applying self mask") if o_mask is not None: self.log("applying other mask") o_mask[d_data != 0.0] = False d_data[o_mask] = FILLVALUE self.log("applying other mask") d_data[np.isnan(d_data)] = FILLVALUE if mask_zero_diff: d_data[np.asarray(d_data == 0.0).nonzero()] = FILLVALUE var = new_net.create_variable( vname, attrs, s_var.dtype, dimensions=s_var.dimensions ) var[:] = d_data self.log(f"processing variable {vname}") new_net.nc.sync()
[docs] def write(self): """write the nc object to disk""" self.log("writing nc file") assert self.nc is not None, "netcdf.write() error: nc file not initialized" # write any new attributes that have been set since # initializing the file for k, v in self.global_attributes.items(): try: if self.nc.attributes.get(k) is not None: self.nc.setncattr(k, v) except Exception as e: self.logger.warn(f"error setting global attribute {k}: {e!s}") self.nc.sync() self.nc.close() self.log("writing nc file")
[docs] def initialize_geometry(self): """initialize the geometric information needed for the netcdf file """ pyproj = import_optional_dependency("pyproj") from ..utils.parse_version import Version # Check if using newer pyproj version conventions if version.parse(pyproj.__version__) < version.parse("2.2"): raise ValueError("The FloPy NetCDF module requires pyproj >= 2.2.0.") print("initialize_geometry::") self.log(f"model crs: {self.model_crs}") print(f"model crs: {self.model_crs}") vmin, vmax = self.model_grid.botm.min(), self.model_grid.top.max() if self.z_positive == "down": vmin, vmax = vmax, vmin else: self.zs = self.model_grid.xyzcellcenters[2].copy() ys = self.model_grid.xyzcellcenters[1].copy() xs = self.model_grid.xyzcellcenters[0].copy() # Transform to a known CRS if self.model_crs is not None and self.nc_crs is not None: self.transformer = pyproj.Transformer.from_crs( self.model_crs, self.nc_crs, always_xy=True ) print(f"initialize_geometry::nc_crs = {self.nc_crs}") xmin, xmax, ymin, ymax = self.model_grid.extent if self.transformer is not None: print(f"transforming coordinates using = {self.transformer}") self.log("projecting grid cell center arrays") self.xs, self.ys = self.transformer.transform(xs, ys) # get transformed bounds and record to check against ScienceBase later bbox = np.array([[xmin, ymin], [xmin, ymax], [xmax, ymax], [xmax, ymin]]) x, y = self.transformer.transform(*bbox.transpose()) self.bounds = x.min(), y.min(), x.max(), y.max() else: self.bounds = xmin, ymin, xmax, ymax self.vbounds = vmin, vmax
[docs] def initialize_file(self, time_values=None): """ initialize the netcdf instance, including global attributes, dimensions, and grid information Parameters ---------- time_values : list of times to use as time dimension entries. If none, then use the times in self.model.dis.perlen and self.start_datetime """ from ..version import __version__ as version if self.nc is not None: raise Exception("nc file already initialized") self.log("initializing geometry") self.initialize_geometry() self.log("initializing geometry") netCDF4 = import_optional_dependency("netCDF4") # open the file for writing try: self.nc = netCDF4.Dataset(self.output_filename, "w") except Exception as e: raise Exception("error creating netcdf dataset") from e # write some attributes self.log("setting standard attributes") self.nc.setncattr("Conventions", f"CF-1.6, ACDD-1.3, flopy {version}") self.nc.setncattr( "date_created", datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") ) self.nc.setncattr("geospatial_vertical_positive", str(self.z_positive)) min_vertical = np.min(self.zs) max_vertical = np.max(self.zs) self.nc.setncattr("geospatial_vertical_min", min_vertical) self.nc.setncattr("geospatial_vertical_max", max_vertical) self.nc.setncattr("geospatial_vertical_resolution", "variable") self.nc.setncattr("featureType", "Grid") for k, v in self.global_attributes.items(): try: self.nc.setncattr(k, v) except Exception as e: self.logger.warn(f"error setting global attribute {k}: {e!s}") self.global_attributes = {} self.log("setting standard attributes") # spatial dimensions self.log("creating dimensions") # time if time_values is None: time_values = np.cumsum(self.modeltime.perlen) self.nc.createDimension("time", len(time_values)) for name, length in zip(self.dimension_names, self.shape): self.nc.createDimension(name, length) self.log("creating dimensions") self.log("setting CRS info") # Metadata variables crs = self.nc.createVariable("crs", "i4") crs.long_name = self.nc_crs_longname crs.crs_wkt = self.nc_crs.to_wkt() crs.semi_major_axis = self.nc_semi_major crs.inverse_flattening = self.nc_inverse_flat self.log("setting CRS info") attribs = { "units": f"{self.time_units} since {self.start_datetime}", "standard_name": "time", "long_name": NC_LONG_NAMES.get("time", "time"), "calendar": "gregorian", "_CoordinateAxisType": "Time", } time = self.create_variable( "time", attribs, precision_str="f8", dimensions=("time",) ) self.logger.warn(f"time_values:{time_values!s}") time[:] = np.asarray(time_values) # Elevation attribs = { "units": self.model_grid.units, "standard_name": "elevation", "long_name": NC_LONG_NAMES.get("elevation", "elevation"), "axis": "Z", "valid_min": min_vertical, "valid_max": max_vertical, "positive": self.z_positive, } elev = self.create_variable( "elevation", attribs, precision_str="f8", dimensions=self.dimension_names ) elev[:] = self.zs # Longitude attribs = { "units": "degrees_east", "standard_name": "longitude", "long_name": NC_LONG_NAMES.get("longitude", "longitude"), "axis": "X", "_CoordinateAxisType": "Lon", } lon = self.create_variable( "longitude", attribs, precision_str="f8", dimensions=self.dimension_names[1:], ) lon[:] = self.xs self.log("creating longitude var") # Latitude self.log("creating latitude var") attribs = { "units": "degrees_north", "standard_name": "latitude", "long_name": NC_LONG_NAMES.get("latitude", "latitude"), "axis": "Y", "_CoordinateAxisType": "Lat", } lat = self.create_variable( "latitude", attribs, precision_str="f8", dimensions=self.dimension_names[1:] ) lat[:] = self.ys # x self.log("creating x var") attribs = { "units": self.model_grid.units, "standard_name": "projection_x_coordinate", "long_name": NC_LONG_NAMES.get("x", "x coordinate of projection"), "axis": "X", } x = self.create_variable( "x_proj", attribs, precision_str="f8", dimensions=self.dimension_names[1:] ) x[:] = self.model_grid.xyzcellcenters[0] # y self.log("creating y var") attribs = { "units": self.model_grid.units, "standard_name": "projection_y_coordinate", "long_name": NC_LONG_NAMES.get("y", "y coordinate of projection"), "axis": "Y", } y = self.create_variable( "y_proj", attribs, precision_str="f8", dimensions=self.dimension_names[1:] ) y[:] = self.model_grid.xyzcellcenters[1] # convert pyproj.CRS object to a # Climate and Forecast (CF) Grid Mapping Version 1.8 dict. attribs = self.nc_crs.to_cf() if attribs is not None: self.log("creating grid mapping variable") self.create_variable( attribs["grid_mapping_name"], attribs, precision_str="f8" ) # layer self.log("creating layer var") attribs = { "units": "", "standard_name": "layer", "long_name": NC_LONG_NAMES.get("layer", "layer"), "positive": "down", "axis": "Z", } lay = self.create_variable("layer", attribs, dimensions=("layer",)) lay[:] = np.arange(0, self.shape[0]) self.log("creating layer var") if self.model_grid.grid_type == "structured": # delc attribs = { "units": self.model_grid.units.strip("s"), "long_name": NC_LONG_NAMES.get( "delc", "Model grid cell spacing along a column" ), } delc = self.create_variable("delc", attribs, dimensions=("y",)) delc[:] = self.model_grid.delc[::-1] if self.model_grid.angrot != 0: delc.comments = ( "This is the row spacing that applied to the UNROTATED grid. " "This grid HAS been rotated before being saved to NetCDF. " "To compute the unrotated grid, use the origin point and " "this array." ) # delr attribs = { "units": self.model_grid.units.strip("s"), "long_name": NC_LONG_NAMES.get( "delr", "Model grid cell spacing along a row" ), } delr = self.create_variable("delr", attribs, dimensions=("x",)) delr[:] = self.model_grid.delr[::-1] if self.model_grid.angrot != 0: delr.comments = ( "This is the col spacing that applied to the UNROTATED grid. " "This grid HAS been rotated before being saved to NetCDF. " "To compute the unrotated grid, use the origin point and " "this array." ) # Workaround for CF/CDM. # http://www.unidata.ucar.edu/software/thredds/current/netcdf-java/ # reference/StandardCoordinateTransforms.html # "explicit_field" exp = self.nc.createVariable("VerticalTransform", "S1") exp.transform_name = "explicit_field" exp.existingDataField = "elevation" exp._CoordinateTransformType = "vertical" exp._CoordinateAxes = "layer" self.nc.sync() return
[docs] def initialize_group( self, group="timeseries", dimensions=("time",), attributes=None, dimension_data=None, ): """ Method to initialize a new group within a netcdf file. This group can have independent dimensions from the global dimensions Parameters ---------- name : str name of the netcdf group dimensions : tuple data dimension names for group dimension_shape : tuple tuple of data dimension lengths attributes : dict nested dictionary of {dimension : {attributes}} for each netcdf group dimension dimension_data : dict dictionary of {dimension : [data]} for each netcdf group dimension """ if attributes is None: attributes = {} if dimension_data is None: dimension_data = {} if self.nc is None: self.initialize_file() if group in self.nc.groups: raise AttributeError(f"{group} group already initialized") self.log(f"creating netcdf group {group}") self.nc.createGroup(group) self.log(f"{group} group created") self.log(f"creating {group} group dimensions") for dim in dimensions: if dim == "time": if "time" not in dimension_data: time_values = np.cumsum(self.modeltime.perlen) else: time_values = dimension_data["time"] self.nc.groups[group].createDimension(dim, len(time_values)) else: if dim not in dimension_data: raise AssertionError( f"{dim} information must be supplied to dimension data" ) else: self.nc.groups[group].createDimension(dim, len(dimension_data[dim])) self.log(f"created {group} group dimensions") dim_names = tuple(i for i in dimensions if i != "time") for dim in dimensions: if dim.lower() == "time": if "time" not in attributes: unit_value = f"{self.time_units} since {self.start_datetime}" attribs = { "units": unit_value, "standard_name": "time", "long_name": NC_LONG_NAMES.get("time", "time"), "calendar": "gregorian", "Axis": "Y", "_CoordinateAxisType": "Time", } else: attribs = attributes["time"] time = self.create_group_variable( group, "time", attribs, precision_str="f8", dimensions=("time",) ) time[:] = np.asarray(time_values) elif dim.lower() == "zone": if "zone" not in attributes: attribs = { "units": "N/A", "standard_name": "zone", "long_name": "zonebudget zone", "Axis": "X", "_CoordinateAxisType": "Zone", } else: attribs = attributes["zone"] zone = self.create_group_variable( group, "zone", attribs, precision_str="i4", dimensions=("zone",) ) zone[:] = np.asarray(dimension_data["zone"]) else: attribs = attributes[dim] var = self.create_group_variable( group, dim, attribs, precision_str="f8", dimensions=dim_names ) var[:] = np.asarray(dimension_data[dim]) self.nc.sync()
[docs] @staticmethod def normalize_name(name): return name.replace(".", "_").replace(" ", "_").replace("-", "_")
[docs] def create_group_variable( self, group, name, attributes, precision_str, dimensions=("time",) ): """ Create a new group variable in the netcdf object Parameters ---------- name : str the name of the variable attributes : dict attributes to add to the new variable precision_str : str netcdf-compliant string. e.g. f4 dimensions : tuple which dimensions the variable applies to default : ("time","layer","x","y") group : str which netcdf group the variable goes in default : None which creates the variable in root Returns ------- nc variable Raises ------ AssertionError if precision_str not right AssertionError if variable name already in netcdf object AssertionError if one of more dimensions do not exist """ name = self.normalize_name(name) if name in STANDARD_VARS and name in self.nc.groups[group].variables.keys(): return if name in self.nc.groups[group].variables.keys(): if self.forgive: self.logger.warn(f"skipping duplicate {group} group variable: {name}") return else: raise Exception(f"duplicate {group} group variable name: {name}") self.log(f"creating group {group} variable: {name}") if precision_str not in PRECISION_STRS: raise AssertionError( "netcdf.create_variable() error: precision " f"string {precision_str} not in {PRECISION_STRS}" ) if group not in self.nc.groups: raise AssertionError( f"netcdf group `{group}` must be created before " "variables can be added to it" ) self.var_attr_dict[f"{group}/{name}"] = attributes var = self.nc.groups[group].createVariable( name, precision_str, dimensions, fill_value=self.fillvalue, zlib=True ) for k, v in attributes.items(): try: var.setncattr(k, v) except Exception as e: self.logger.warn( "error setting attribute " f"{k} for group {group} variable {name}: {e!s}" ) self.log(f"creating group {group} variable: {name}") self.nc.sync() return var
[docs] def create_variable( self, name, attributes, precision_str="f4", dimensions=("time", "layer"), group=None, ): """ Create a new variable in the netcdf object Parameters ---------- name : str the name of the variable attributes : dict attributes to add to the new variable precision_str : str netcdf-compliant string. e.g. f4 dimensions : tuple which dimensions the variable applies to default : ("time","layer","x","y") group : str which netcdf group the variable goes in default : None which creates the variable in root Returns ------- nc variable Raises ------ AssertionError if precision_str not right AssertionError if variable name already in netcdf object AssertionError if one of more dimensions do not exist """ # Normalize variable name name = self.normalize_name(name) # if this is a core var like a dimension... # long_name = attributes.pop("long_name",name) if name in STANDARD_VARS and name in self.nc.variables.keys(): return if name not in self.var_attr_dict.keys() and name in self.nc.variables.keys(): if self.forgive: self.logger.warn(f"skipping duplicate variable: {name}") return else: raise Exception(f"duplicate variable name: {name}") if name in self.nc.variables.keys(): raise Exception(f"duplicate variable name: {name}") self.log(f"creating variable: {name}") assert precision_str in PRECISION_STRS, ( "netcdf.create_variable() error: precision string {} not in {}".format( precision_str, PRECISION_STRS ) ) if self.nc is None: self.initialize_file() self.var_attr_dict[name] = attributes var = self.nc.createVariable( name, precision_str, dimensions, fill_value=self.fillvalue, zlib=True ) for k, v in attributes.items(): try: var.setncattr(k, v) except Exception as e: self.logger.warn( f"error setting attribute{k} for variable {name}: {e!s}" ) self.log(f"creating variable: {name}") self.nc.sync() return var
[docs] def add_global_attributes(self, attr_dict): """add global attribute to an initialized file Parameters ---------- attr_dict : dict(attribute name, attribute value) Returns ------- None Raises ------ Exception of self.nc is None (initialize_file() has not been called) """ if self.nc is None: mess = ( "NetCDF.add_global_attributes() should only " "be called after the file has been initialized" ) self.logger.warn(mess) raise Exception(mess) self.log("setting global attributes") self.nc.setncatts(attr_dict) self.log("setting global attributes") self.nc.sync()
[docs] def add_sciencebase_metadata(self, id, check=True): """Add metadata from ScienceBase using the flopy.export.metadata.acdd class. Returns ------- metadata : flopy.export.metadata.acdd object """ md = acdd(id, model=self.model) if md.sb is not None: if check: self._check_vs_sciencebase(md) # get set of public attributes attr = {n for n in dir(md) if "_" not in n[0]} # skip some convenience attributes skip = { "bounds", "creator", "sb", "xmlroot", "time_coverage", "get_sciencebase_xml_metadata", "get_sciencebase_metadata", } towrite = sorted(attr.difference(skip)) for k in towrite: v = getattr(md, k) if v is not None: # convert everything to strings if not isinstance(v, str): if isinstance(v, list): v = ",".join(v) else: v = str(v) self.global_attributes[k] = v self.nc.setncattr(k, v) self.write() return md
def _check_vs_sciencebase(self, md): """Check that model bounds read from flopy are consistent with those in ScienceBase.""" xmin, ymin, xmax, ymax = self.bounds tol = 1e-5 assert md.geospatial_lon_min - xmin < tol assert md.geospatial_lon_max - xmax < tol assert md.geospatial_lat_min - ymin < tol assert md.geospatial_lat_max - ymax < tol assert md.geospatial_vertical_min - self.vbounds[0] < tol assert md.geospatial_vertical_max - self.vbounds[1] < tol
[docs] def get_longnames_from_docstrings(self, outfile="longnames.py"): """ This is experimental. Scrape Flopy module docstrings and return docstrings for parameters included in the list of variables added to NetCdf object. Create a dictionary of longnames keyed by the NetCdf variable names; make each longname from the first sentence of the docstring for that parameter. One major limitation is that variables from mflists often aren't described in the docstrings. """ def startstop(ds): """Get just the Parameters section of the docstring.""" start, stop = 0, -1 for i, l in enumerate(ds): if "Parameters" in l and "----" in ds[i + 1]: start = i + 2 if l.strip() in {"Attributes", "Methods", "Returns", "Notes"}: stop = i - 1 break if i >= start and "----" in l: stop = i - 2 break return start, stop def get_entries(ds): """Parse docstring entries into dictionary.""" stuff = {} k = None for line in ds: if ( len(line) >= 5 and line[:4] == " " * 4 and line[4] != " " and ":" in line ): k = line.split(":")[0].strip() stuff[k] = "" # lines with parameter descriptions elif k is not None and len(line) > 10: # avoid orphans stuff[k] += line.strip() + " " return stuff # get a list of the flopy classes packages = [(pp.name[0], pp) for pp in self.model.packagelist] # get a list of the NetCDF variables attr = [v.split("_")[-1] for v in self.nc.variables] # parse docstrings to get long names longnames = dict.fromkeys(attr, "") for pkg in packages: # parse the docstring obj = pkg[-1] ds = obj.__doc__.split("\n") start, stop = startstop(ds) txt = ds[start:stop] if stop - start > 0: params = get_entries(txt) for k, v in params.items(): if k in attr: longnames[k] = v.split(". ")[0] longnames_dict = json.dumps(longnames, sort_keys=True, indent=4) with open(outfile, "w") as output: output.write("NC_LONG_NAMES = ") output.write(longnames_dict) return longnames