Source code for flopy.mf6.data.mfstructure

"""
mfstructure module.  Contains classes related to package structure


"""

import ast
import keyword
import os
import warnings
from enum import Enum
from textwrap import TextWrapper

import numpy as np

from ..mfbase import PackageContainer, StructException

numeric_index_text = (
    "This argument is an index variable, which means that "
    "it should be treated as zero-based when working with "
    "FloPy and Python. Flopy will automatically subtract "
    "one when loading index variables and add one when "
    "writing index variables."
)


[docs]class DfnType(Enum): common = 1 sim_name_file = 2 sim_tdis_file = 3 ims_file = 4 exch_file = 5 model_name_file = 6 model_file = 7 gnc_file = 8 mvr_file = 9 utl = 10 mvt_file = 11 unknown = 999
[docs]class Dfn: """ Base class for package file definitions Attributes ---------- dfndir : path folder containing package definition files (dfn) common : path file containing common information Methods ------- get_file_list : () : list returns all of the dfn files found in dfndir. files are returned in a specified order defined in the local variable file_order See Also -------- Notes ----- Examples ---- """ def __init__(self): # directories self.dfndir = os.path.join(".", "dfn") self.common = os.path.join(self.dfndir, "common.dfn")
[docs] def get_file_list(self): file_order = [ "sim-nam", # dfn completed tex updated "sim-tdis", # dfn completed tex updated "exg-gwfgwf", # dfn completed tex updated "sln-ims", # dfn completed tex updated "gwf-nam", # dfn completed tex updated "gwf-dis", # dfn completed tex updated "gwf-disv", # dfn completed tex updated "gwf-disu", # dfn completed tex updated "gwf-ic", # dfn completed tex updated "gwf-npf", # dfn completed tex updated "gwf-sto", # dfn completed tex updated "gwf-hfb", # dfn completed tex updated "gwf-chd", # dfn completed tex updated "gwf-wel", # dfn completed tex updated "gwf-drn", # dfn completed tex updated "gwf-riv", # dfn completed tex updated "gwf-ghb", # dfn completed tex updated "gwf-rch", # dfn completed tex updated "gwf-rcha", # dfn completed tex updated "gwf-evt", # dfn completed tex updated "gwf-evta", # dfn completed tex updated "gwf-maw", # dfn completed tex updated "gwf-sfr", # dfn completed tex updated "gwf-lak", # dfn completed tex updated "gwf-uzf", # dfn completed tex updated "gwf-mvr", # dfn completed tex updated "gwf-gnc", # dfn completed tex updated "gwf-oc", # dfn completed tex updated "utl-obs", "utl-ts", "utl-tab", "utl-tas", ] dfn_path, tail = os.path.split(os.path.realpath(__file__)) dfn_path = os.path.join(dfn_path, "dfn") # construct list of dfn files to process in the order of file_order files = os.listdir(dfn_path) for f in files: if "common" in f or "flopy" in f: continue package_abbr = os.path.splitext(f)[0] if package_abbr not in file_order: file_order.append(package_abbr) return [ f"{fname}.dfn" for fname in file_order if f"{fname}.dfn" in files ]
def _file_type(self, file_name): # determine file type if len(file_name) >= 6 and file_name[0:6] == "common": return DfnType.common, None elif file_name[0:3] == "sim": if file_name[3:6] == "nam": return DfnType.sim_name_file, None elif file_name[3:7] == "tdis": return DfnType.sim_tdis_file, None else: return DfnType.unknown, None elif file_name[0:3] == "nam": return DfnType.sim_name_file, None elif file_name[0:4] == "tdis": return DfnType.sim_tdis_file, None elif file_name[0:3] == "sln" or file_name[0:3] == "ims": return DfnType.ims_file, None elif file_name[0:3] == "exg": return DfnType.exch_file, file_name[3:6] elif file_name[0:3] == "utl": return DfnType.utl, None else: model_type = file_name[0:3] if file_name[3:6] == "nam": return DfnType.model_name_file, model_type elif file_name[3:6] == "gnc": return DfnType.gnc_file, model_type elif file_name[3:6] == "mvr": return DfnType.mvr_file, model_type elif file_name[3:6] == "mvt": return DfnType.mvt_file, model_type else: return DfnType.model_file, model_type
[docs]class DfnPackage(Dfn): """ Dfn child class that loads dfn information from a list structure stored in the auto-built package classes Attributes ---------- package : MFPackage MFPackage subclass that contains dfn information Methods ------- get_block_structure_dict : (path : tuple, common : bool, model_file : bool) : dict returns a dictionary of block structure information for the package See Also -------- Notes ----- Examples ---- """ def __init__(self, package): super().__init__() self.package = package self.package_type = package._package_type self.dfn_file_name = package.dfn_file_name # the package type is always the text after the last - package_name = self.package_type.split("-") self.package_type = package_name[-1] if not isinstance(package_name, str) and len(package_name) > 1: self.package_prefix = "".join(package_name[:-1]) else: self.package_prefix = "" self.dfn_type, self.model_type = self._file_type( self.dfn_file_name.replace("-", "") ) self.dfn_list = package.dfn
[docs] def get_block_structure_dict(self, path, common, model_file, block_parent): block_dict = {} dataset_items_in_block = {} self.dataset_items_needed_dict = {} keystring_items_needed_dict = {} current_block = None # get header dict header_dict = {} for item in self.dfn_list[0]: if isinstance(item, str): if item == "multi-package": header_dict["multi-package"] = True if item.startswith("package-type"): header_dict["package-type"] = item.split(" ")[1] for dfn_entry in self.dfn_list[1:]: # load next data item new_data_item_struct = MFDataItemStructure() for next_line in dfn_entry: new_data_item_struct.set_value(next_line, common) # if block does not exist if ( current_block is None or current_block.name != new_data_item_struct.block_name ): # create block current_block = MFBlockStructure( new_data_item_struct.block_name, path, model_file, block_parent, ) # put block in block_dict block_dict[current_block.name] = current_block # init dataset item lookup self.dataset_items_needed_dict = {} dataset_items_in_block = {} # resolve block type if len(current_block.block_header_structure) > 0: if ( len( current_block.block_header_structure[ 0 ].data_item_structures ) > 0 and current_block.block_header_structure[0] .data_item_structures[0] .type == DatumType.integer ): block_type = BlockType.transient else: block_type = BlockType.multiple else: block_type = BlockType.single if new_data_item_struct.block_variable: block_dataset_struct = MFDataStructure( new_data_item_struct, model_file, self.package_type, self.dfn_list, ) block_dataset_struct.parent_block = current_block self._process_needed_data_items( block_dataset_struct, dataset_items_in_block ) block_dataset_struct.set_path( path + (new_data_item_struct.block_name,) ) block_dataset_struct.add_item(new_data_item_struct) current_block.add_dataset(block_dataset_struct) else: new_data_item_struct.block_type = block_type dataset_items_in_block[new_data_item_struct.name] = ( new_data_item_struct ) # if data item belongs to existing dataset(s) item_location_found = False if new_data_item_struct.name in self.dataset_items_needed_dict: if new_data_item_struct.type == DatumType.record: # record within a record - create a data set in # place of the data item new_data_item_struct = self._new_dataset( new_data_item_struct, current_block, dataset_items_in_block, path, model_file, False, ) new_data_item_struct.record_within_record = True for dataset in self.dataset_items_needed_dict[ new_data_item_struct.name ]: item_added = dataset.add_item( new_data_item_struct, record=True ) item_location_found = item_location_found or item_added # if data item belongs to an existing keystring if new_data_item_struct.name in keystring_items_needed_dict: new_data_item_struct.set_path( keystring_items_needed_dict[ new_data_item_struct.name ].path ) if new_data_item_struct.type == DatumType.record: # record within a keystring - create a data set in # place of the data item new_data_item_struct = self._new_dataset( new_data_item_struct, current_block, dataset_items_in_block, path, model_file, False, ) keystring_items_needed_dict[ new_data_item_struct.name ].keystring_dict[ new_data_item_struct.name ] = new_data_item_struct item_location_found = True if new_data_item_struct.type == DatumType.keystring: # add keystrings to search list for ( key, val, ) in new_data_item_struct.keystring_dict.items(): keystring_items_needed_dict[key] = new_data_item_struct # if data set does not exist if not item_location_found: self._new_dataset( new_data_item_struct, current_block, dataset_items_in_block, path, model_file, True, ) if ( current_block.name.upper() == "SOLUTIONGROUP" and len(current_block.block_header_structure) == 0 ): # solution_group a special case for now block_data_item_struct = MFDataItemStructure() block_data_item_struct.name = "order_num" block_data_item_struct.data_items = ["order_num"] block_data_item_struct.type = DatumType.integer block_data_item_struct.longname = "order_num" block_data_item_struct.description = ( "internal variable to keep track of " "solution group number" ) block_dataset_struct = MFDataStructure( block_data_item_struct, model_file, self.package_type, self.dfn_list, ) block_dataset_struct.parent_block = current_block block_dataset_struct.set_path( path + (new_data_item_struct.block_name,) ) block_dataset_struct.add_item(block_data_item_struct) current_block.add_dataset(block_dataset_struct) return block_dict, header_dict
def _new_dataset( self, new_data_item_struct, current_block, dataset_items_in_block, path, model_file, add_to_block=True, ): current_dataset_struct = MFDataStructure( new_data_item_struct, model_file, self.package_type, self.dfn_list ) current_dataset_struct.set_path( path + (new_data_item_struct.block_name,) ) self._process_needed_data_items( current_dataset_struct, dataset_items_in_block ) if add_to_block: # add dataset current_block.add_dataset(current_dataset_struct) current_dataset_struct.parent_block = current_block current_dataset_struct.add_item(new_data_item_struct) return current_dataset_struct def _process_needed_data_items( self, current_dataset_struct, dataset_items_in_block ): # add data items needed to dictionary for ( item_name, val, ) in current_dataset_struct.expected_data_items.items(): if item_name in dataset_items_in_block: current_dataset_struct.add_item( dataset_items_in_block[item_name] ) else: if item_name in self.dataset_items_needed_dict: self.dataset_items_needed_dict[item_name].append( current_dataset_struct ) else: self.dataset_items_needed_dict[item_name] = [ current_dataset_struct ]
[docs]class DfnFile(Dfn): """ Dfn child class that loads dfn information from a package definition (dfn) file Attributes ---------- file : str name of the file to be loaded Methods ------- dict_by_name : {} : dict returns a dictionary of data item descriptions from the dfn file with the data item name as the dictionary key get_block_structure_dict : (path : tuple, common : bool, model_file : bool) : dict returns a dictionary of block structure information for the package See Also -------- Notes ----- Examples ---- """ def __init__(self, file): super().__init__() dfn_path, tail = os.path.split(os.path.realpath(__file__)) dfn_path = os.path.join(dfn_path, "dfn") self._file_path = os.path.join(dfn_path, file) self.dfn_file_name = file self.dfn_type, self.model_type = self._file_type( self.dfn_file_name.replace("-", "") ) self.package_type = os.path.splitext(file[4:])[0] # the package type is always the text after the last - package_name = self.package_type.split("-") self.package_type = package_name[-1] if not isinstance(package_name, str) and len(package_name) > 1: self.package_prefix = "".join(package_name[:-1]) else: self.package_prefix = "" self.file = file self.dataset_items_needed_dict = {} self.dfn_list = []
[docs] def dict_by_name(self): name_dict = {} name = None dfn_fp = open(self._file_path, "r") for line in dfn_fp: if self._valid_line(line): arr_line = line.strip().split() if arr_line[0] == "name": name = arr_line[1] elif arr_line[0] == "description" and name is not None: name_dict[name] = " ".join(arr_line[1:]) dfn_fp.close() return name_dict
[docs] def get_block_structure_dict(self, path, common, model_file, block_parent): self.dfn_list = [] block_dict = {} dataset_items_in_block = {} self.dataset_items_needed_dict = {} keystring_items_needed_dict = {} current_block = None dfn_fp = open(self._file_path, "r") # load header header_dict = {} while True: line = dfn_fp.readline() if len(line) < 1 or line[0] != "#": break line_lst = line.strip().split() if len(line_lst) > 2 and line_lst[1] == "flopy": # load flopy data if line_lst[2] == "multi-package": header_dict["multi-package"] = True if line_lst[2] == "parent_name_type" and len(line_lst) == 5: header_dict["parent_name_type"] = [ line_lst[3], line_lst[4], ] elif len(line_lst) > 2 and line_lst[1] == "package-type": header_dict["package-type"] = line_lst[2] # load file definitions for line in dfn_fp: if self._valid_line(line): # load next data item new_data_item_struct = MFDataItemStructure() new_data_item_struct.set_value(line, common) self.dfn_list.append([line]) for next_line in dfn_fp: if self._empty_line(next_line): break if self._valid_line(next_line): new_data_item_struct.set_value(next_line, common) self.dfn_list[-1].append(next_line) # if block does not exist if ( current_block is None or current_block.name != new_data_item_struct.block_name ): # create block current_block = MFBlockStructure( new_data_item_struct.block_name, path, model_file, block_parent, ) # put block in block_dict block_dict[current_block.name] = current_block # init dataset item lookup self.dataset_items_needed_dict = {} dataset_items_in_block = {} # resolve block type if len(current_block.block_header_structure) > 0: if ( len( current_block.block_header_structure[ 0 ].data_item_structures ) > 0 and current_block.block_header_structure[0] .data_item_structures[0] .type == DatumType.integer ): block_type = BlockType.transient else: block_type = BlockType.multiple else: block_type = BlockType.single if new_data_item_struct.block_variable: block_dataset_struct = MFDataStructure( new_data_item_struct, model_file, self.package_type, self.dfn_list, ) block_dataset_struct.parent_block = current_block self._process_needed_data_items( block_dataset_struct, dataset_items_in_block ) block_dataset_struct.set_path( path + (new_data_item_struct.block_name,) ) block_dataset_struct.add_item( new_data_item_struct, False, self.dfn_list ) current_block.add_dataset(block_dataset_struct) else: new_data_item_struct.block_type = block_type dataset_items_in_block[new_data_item_struct.name] = ( new_data_item_struct ) # if data item belongs to existing dataset(s) item_location_found = False if ( new_data_item_struct.name in self.dataset_items_needed_dict ): if new_data_item_struct.type == DatumType.record: # record within a record - create a data set in # place of the data item new_data_item_struct = self._new_dataset( new_data_item_struct, current_block, dataset_items_in_block, path, model_file, False, ) new_data_item_struct.record_within_record = True for dataset in self.dataset_items_needed_dict[ new_data_item_struct.name ]: item_added = dataset.add_item( new_data_item_struct, True, self.dfn_list ) item_location_found = ( item_location_found or item_added ) # if data item belongs to an existing keystring if ( new_data_item_struct.name in keystring_items_needed_dict ): new_data_item_struct.set_path( keystring_items_needed_dict[ new_data_item_struct.name ].path ) if new_data_item_struct.type == DatumType.record: # record within a keystring - create a data set in # place of the data item new_data_item_struct = self._new_dataset( new_data_item_struct, current_block, dataset_items_in_block, path, model_file, False, ) keystring_items_needed_dict[ new_data_item_struct.name ].keystring_dict[ new_data_item_struct.name ] = new_data_item_struct item_location_found = True if new_data_item_struct.type == DatumType.keystring: # add keystrings to search list for ( key, val, ) in new_data_item_struct.keystring_dict.items(): keystring_items_needed_dict[key] = ( new_data_item_struct ) # if data set does not exist if not item_location_found: self._new_dataset( new_data_item_struct, current_block, dataset_items_in_block, path, model_file, True, ) if ( current_block.name.upper() == "SOLUTIONGROUP" and len(current_block.block_header_structure) == 0 ): # solution_group a special case for now block_data_item_struct = MFDataItemStructure() block_data_item_struct.name = "order_num" block_data_item_struct.data_items = ["order_num"] block_data_item_struct.type = DatumType.integer block_data_item_struct.longname = "order_num" block_data_item_struct.description = ( "internal variable to keep track of " "solution group number" ) block_dataset_struct = MFDataStructure( block_data_item_struct, model_file, self.package_type, self.dfn_list, ) block_dataset_struct.parent_block = current_block block_dataset_struct.set_path( path + (new_data_item_struct.block_name,) ) block_dataset_struct.add_item( block_data_item_struct, False, self.dfn_list ) current_block.add_dataset(block_dataset_struct) dfn_fp.close() return block_dict, header_dict
def _new_dataset( self, new_data_item_struct, current_block, dataset_items_in_block, path, model_file, add_to_block=True, ): current_dataset_struct = MFDataStructure( new_data_item_struct, model_file, self.package_type, self.dfn_list ) current_dataset_struct.set_path( path + (new_data_item_struct.block_name,) ) self._process_needed_data_items( current_dataset_struct, dataset_items_in_block ) if add_to_block: # add dataset current_block.add_dataset(current_dataset_struct) current_dataset_struct.parent_block = current_block current_dataset_struct.add_item( new_data_item_struct, False, self.dfn_list ) return current_dataset_struct def _process_needed_data_items( self, current_dataset_struct, dataset_items_in_block ): # add data items needed to dictionary for ( item_name, val, ) in current_dataset_struct.expected_data_items.items(): if item_name in dataset_items_in_block: current_dataset_struct.add_item( dataset_items_in_block[item_name], False, self.dfn_list ) else: if item_name in self.dataset_items_needed_dict: self.dataset_items_needed_dict[item_name].append( current_dataset_struct ) else: self.dataset_items_needed_dict[item_name] = [ current_dataset_struct ] def _valid_line(self, line): if len(line.strip()) > 1 and line[0] != "#": return True return False def _empty_line(self, line): if len(line.strip()) <= 1: return True return False
[docs]class DataType(Enum): """ Types of data that can be found in a package file """ scalar_keyword = 1 scalar = 2 array = 3 array_transient = 4 list = 5 list_transient = 6 list_multiple = 7 scalar_transient = 8 scalar_keyword_transient = 9
[docs]class DatumType(Enum): """ Types of individual pieces of data """ keyword = 1 integer = 2 double_precision = 3 string = 4 constant = 5 list_defined = 6 keystring = 7 record = 8 repeating_record = 9 recarray = 10
[docs]class BlockType(Enum): """ Types of blocks that can be found in a package file """ single = 1 multiple = 2 transient = 3
[docs]class MFDataItemStructure: """ Defines the structure of a single MF6 data item in a dfn file Attributes ---------- block_name : str name of block that data item is in name : str name of data item name_list : list list of alternate names for the data item, includes data item's main name "name" python_name : str name of data item referenced in python, with illegal python characters removed type : str type of the data item as it appears in the dfn file type_obj : python type type of the data item as a python type valid_values : list list of valid values for the data item. if empty, this constraint does not apply data_items : list list of data items contained in this data_item, including itself in_record : bool in_record attribute as appears in dfn file tagged : bool whether data item is tagged. if the data item is tagged its name is included in the MF6 input file just_data : bool when just_data is true only data appears in the MF6 input file. otherwise, name information appears shape : list describes the shape of the data layer_dims : list which dimensions in the shape function as layers, if None defaults to "layer" reader : basestring reader that MF6 uses to read the data optional : bool whether data item is optional or required as part of the MFData in the MF6 input file longname : str long name of the data item description : str description of the data item path : tuple a tuple describing the data item's location within the simulation (<model>,<package>,<block>,<data>) repeating : bool whether or not the data item can repeat in the MF6 input file block_variable : bool if true, this data item is part of the block header block_type : BlockType whether the block containing this item is a single non-repeating block, a multiple repeating block, or a transient repeating block keystring_dict : dict dictionary containing acceptable keystrings if this data item is of type keystring is_cellid : bool true if this data item is definitely of type cellid possible_cellid : bool true if this data item may be of type cellid ucase : bool this data item must be displayed in upper case in the MF6 input file Methods ------- remove_cellid : (resolved_shape : list, cellid_size : int) removes the cellid size from the shape of a data item set_path : (path : tuple) sets the path to this data item to path get_rec_type : () : object type gets the type of object of this data item to be used in a numpy recarray See Also -------- Notes ----- Examples -------- """ def __init__(self): self.file_name_keywords = {"filein": False, "fileout": False} self.file_name_key_seq = {"fname": True} self.contained_keywords = {"fname": True, "file": True, "tdis6": True} self.block_name = None self.name = None self.display_name = None self.name_length = None self.is_aux = False self.is_boundname = False self.is_mname = False self.name_list = [] self.python_name = None self.type = None self.type_string = None self.type_obj = None self.valid_values = [] self.data_items = None self.in_record = False self.tagged = True self.just_data = False self.shape = [] self.layer_dims = ["nlay"] self.reader = None self.optional = False self.longname = None self.description = "" self.path = None self.repeating = False self.block_variable = False self.block_type = BlockType.single self.keystring_dict = {} self.is_cellid = False self.possible_cellid = False self.ucase = False self.preserve_case = False self.default_value = None self.numeric_index = False self.support_negative_index = False self.construct_package = None self.construct_data = None self.parameter_name = None self.one_per_pkg = False self.jagged_array = None
[docs] def set_value(self, line, common): arr_line = line.strip().split() if len(arr_line) > 1: if arr_line[0] == "block": self.block_name = " ".join(arr_line[1:]) elif arr_line[0] == "name": if self.type == DatumType.keyword: # display keyword names in upper case self.display_name = " ".join(arr_line[1:]).upper() else: self.display_name = " ".join(arr_line[1:]).lower() self.name = " ".join(arr_line[1:]).lower() self.name_list.append(self.name) if len(self.name) >= 6 and self.name[0:6] == "cellid": self.is_cellid = True if ( self.name and self.name[0:2] == "id" and self.type == DatumType.string ): self.possible_cellid = True self.python_name = self.name.replace("-", "_").lower() # don't allow name to be a python keyword if keyword.iskeyword(self.name): self.python_name = f"{self.python_name}_" # performance optimizations if self.name == "aux": self.is_aux = True if self.name == "boundname": self.is_boundname = True if self.name[0:5] == "mname": self.is_mname = True self.name_length = len(self.name) elif arr_line[0] == "other_names": arr_names = " ".join(arr_line[1:]).lower().split(",") for name in arr_names: self.name_list.append(name) elif arr_line[0] == "type": if self.support_negative_index: # type already automatically set when # support_negative_index flag is set return type_line = arr_line[1:] if len(type_line) <= 0: raise StructException( 'Data structure "{}" does not have ' "a type specified" ".".format(self.name), self.path, ) self.type_string = type_line[0].lower() self.type = self._str_to_enum_type(type_line[0]) if ( self.name and self.name[0:2] == "id" and self.type == DatumType.string ): self.possible_cellid = True if ( self.type == DatumType.recarray or self.type == DatumType.record or self.type == DatumType.repeating_record or self.type == DatumType.keystring ): self.data_items = type_line[1:] if self.type == DatumType.keystring: for item in self.data_items: self.keystring_dict[item.lower()] = 0 else: self.data_items = [self.name] self.type_obj = self._get_type() if self.type == DatumType.keyword: # display keyword names in upper case if self.display_name is not None: self.display_name = self.display_name.upper() elif arr_line[0] == "valid": for value in arr_line[1:]: self.valid_values.append(value) elif arr_line[0] == "in_record": self.in_record = self._get_boolean_val(arr_line) elif arr_line[0] == "tagged": self.tagged = self._get_boolean_val(arr_line) elif arr_line[0] == "just_data": self.just_data = self._get_boolean_val(arr_line) elif arr_line[0] == "shape": if len(arr_line) > 1: self.shape = [] for dimension in arr_line[1:]: if dimension[-1] != ";": dimension = dimension.replace("(", "") dimension = dimension.replace(")", "") dimension = dimension.replace(",", "") if dimension[0] == "*": dimension = dimension.replace("*", "") # set as a "layer" dimension self.layer_dims.insert(0, dimension) self.shape.append(dimension) else: # only process what is after the last ; which by # convention is the most generalized form of the # shape self.shape = [] if len(self.shape) > 0: self.repeating = True elif arr_line[0] == "reader": self.reader = " ".join(arr_line[1:]) elif arr_line[0] == "optional": self.optional = self._get_boolean_val(arr_line) elif arr_line[0] == "longname": self.longname = " ".join(arr_line[1:]) elif arr_line[0] == "description": if arr_line[1] == "REPLACE": self.description = self._resolve_common(arr_line, common) elif len(arr_line) > 1 and arr_line[1].strip(): self.description = " ".join(arr_line[1:]) # clean self.description replace_pairs = [ ("``", '"'), # double quotes ("''", '"'), ("`", "'"), # single quotes ("~", " "), # non-breaking space (r"\mf", "MODFLOW 6"), (r"\citep{konikow2009}", "(Konikow et al., 2009)"), (r"\citep{hill1990preconditioned}", "(Hill, 1990)"), (r"\ref{table:ftype}", "in mf6io.pdf"), (r"\ref{table:gwf-obstypetable}", "in mf6io.pdf"), ] for s1, s2 in replace_pairs: if s1 in self.description: self.description = self.description.replace(s1, s2) # massage latex equations self.description = self.description.replace("$<$", "<") self.description = self.description.replace("$>$", ">") if "$" in self.description: descsplit = self.description.split("$") mylist = [ i.replace("\\", "") + ":math:`" + j.replace("\\", "\\\\") + "`" for i, j in zip(descsplit[::2], descsplit[1::2]) ] mylist.append(descsplit[-1].replace("\\", "")) self.description = "".join(mylist) else: self.description = self.description.replace("\\", "") elif arr_line[0] == "block_variable": if len(arr_line) > 1: self.block_variable = bool(arr_line[1]) elif arr_line[0] == "ucase": if len(arr_line) > 1: self.ucase = bool(arr_line[1]) elif arr_line[0] == "preserve_case": self.preserve_case = self._get_boolean_val(arr_line) elif arr_line[0] == "default_value": self.default_value = " ".join(arr_line[1:]) elif arr_line[0] == "numeric_index": self.numeric_index = self._get_boolean_val(arr_line) elif arr_line[0] == "support_negative_index": self.support_negative_index = self._get_boolean_val(arr_line) # must be double precision to support 0 and -0 self.type_string = "double_precision" self.type = self._str_to_enum_type(self.type_string) self.type_obj = self._get_type() elif arr_line[0] == "construct_package": self.construct_package = arr_line[1] elif arr_line[0] == "construct_data": self.construct_data = arr_line[1] elif arr_line[0] == "parameter_name": self.parameter_name = arr_line[1] elif arr_line[0] == "one_per_pkg": self.one_per_pkg = bool(arr_line[1]) elif arr_line[0] == "jagged_array": self.jagged_array = arr_line[1]
[docs] def get_type_string(self): return f"[{self.type_string}]"
[docs] def get_description(self, line_size, initial_indent, level_indent): item_desc = f"* {self.name} ({self.type_string}) {self.description}" if self.numeric_index or self.is_cellid: # append zero-based index text item_desc = f"{item_desc} {numeric_index_text}" twr = TextWrapper( width=line_size, initial_indent=initial_indent, drop_whitespace=True, subsequent_indent=f" {initial_indent}", ) item_desc = "\n".join(twr.wrap(item_desc)) return item_desc
[docs] def get_doc_string(self, line_size, initial_indent, level_indent): description = self.get_description( line_size, initial_indent + level_indent, level_indent ) param_doc_string = f"{self.python_name} : {self.get_type_string()}" twr = TextWrapper( width=line_size, initial_indent=initial_indent, subsequent_indent=f" {initial_indent}", drop_whitespace=True, ) param_doc_string = "\n".join(twr.wrap(param_doc_string)) param_doc_string = f"{param_doc_string}\n{description}" return param_doc_string
[docs] def get_keystring_desc(self, line_size, initial_indent, level_indent): if self.type != DatumType.keystring: raise StructException( f'Can not get keystring description for "{self.name}" ' "because it is not a keystring", self.path, ) # get description of keystring elements description = "" for key, item in self.keystring_dict.items(): if description: description = f"{description}\n" description = "{}{}".format( description, item.get_doc_string(line_size, initial_indent, level_indent), ) return description
[docs] def file_nam_in_nam_file(self): for key, item in self.contained_keywords.items(): if self.name.lower().find(key) != -1: return True
[docs] def indicates_file_name(self): if self.name.lower() in self.file_name_keywords: return True for key in self.file_name_key_seq.keys(): if key in self.name.lower(): return True return False
[docs] def is_file_name(self): if ( self.name.lower() in self.file_name_keywords and self.file_name_keywords[self.name.lower()] is True ): return True for key, item in self.contained_keywords.items(): if self.name.lower().find(key) != -1 and item is True: return True return False
[docs] @staticmethod def remove_cellid(resolved_shape, cellid_size): # remove the cellid size from the shape for dimension, index in zip( resolved_shape, range(0, len(resolved_shape)) ): if dimension == cellid_size: resolved_shape[index] = 1 break
@staticmethod def _get_boolean_val(bool_option_line): if len(bool_option_line) <= 1: return False if bool_option_line[1].lower() == "true": return True return False @staticmethod def _find_close_bracket(arr_line): for index, word in enumerate(arr_line): word = word.strip() if len(word) > 0 and word[-1] == "}": return index return None @staticmethod def _resolve_common(arr_line, common): if common is None: return arr_line if not (arr_line[2] in common and len(arr_line) >= 4): raise StructException( f'Could not find line "{arr_line}" in common dfn.' ) close_bracket_loc = MFDataItemStructure._find_close_bracket( arr_line[2:] ) resolved_str = common[arr_line[2]] if close_bracket_loc is None: find_replace_str = " ".join(arr_line[3:]) else: close_bracket_loc += 3 find_replace_str = " ".join(arr_line[3:close_bracket_loc]) find_replace_dict = ast.literal_eval(find_replace_str) for find_str, replace_str in find_replace_dict.items(): resolved_str = resolved_str.replace(find_str, replace_str) # clean up formatting resolved_str = resolved_str.replace("\\texttt", "") resolved_str = resolved_str.replace("{", "") resolved_str = resolved_str.replace("}", "") return resolved_str
[docs] def set_path(self, path): self.path = path + (self.name,) mfstruct = MFStructure(True) for dimension in self.shape: dim_path = path + (dimension,) if dim_path in mfstruct.dimension_dict: mfstruct.dimension_dict[dim_path].append(self) else: mfstruct.dimension_dict[dim_path] = [self]
def _get_type(self): if self.type == DatumType.double_precision: return float elif self.type == DatumType.integer: return int elif self.type == DatumType.constant: return bool elif self.type == DatumType.string: return str elif self.type == DatumType.list_defined: return str return str def _str_to_enum_type(self, type_string): if type_string.lower() == "keyword": return DatumType.keyword elif type_string.lower() == "integer": return DatumType.integer elif ( type_string.lower() == "double_precision" or type_string.lower() == "double" ): return DatumType.double_precision elif type_string.lower() == "string": return DatumType.string elif type_string.lower() == "constant": return DatumType.constant elif type_string.lower() == "list-defined": return DatumType.list_defined elif type_string.lower() == "keystring": return DatumType.keystring elif type_string.lower() == "record": return DatumType.record elif type_string.lower() == "recarray": return DatumType.recarray elif type_string.lower() == "repeating_record": return DatumType.repeating_record else: exc_text = f'Data item type "{type_string}" not supported.' raise StructException(exc_text, self.path)
[docs] def get_rec_type(self): item_type = self.type_obj if item_type == str or self.is_cellid or self.possible_cellid: return object return item_type
[docs]class MFDataStructure: """ Defines the structure of a single MF6 data item in a dfn file Parameters ---------- data_item : MFDataItemStructure base data item associated with this data structure model_data : bool whether or not this is part of a model package_type : str abbreviated package type Attributes ---------- type : str type of the data as it appears in the dfn file path : tuple a tuple describing the data's location within the simulation (<model>,<package>,<block>,<data>) optional : bool whether data is optional or required as part of the MFBlock in the MF6 input file name : str name of data item name_list : list list of alternate names for the data, includes data item's main name "name" python_name : str name of data referenced in python, with illegal python characters removed longname : str long name of the data repeating : bool whether or not the data can repeat in the MF6 input file layered : bool whether this data can appear by layer num_data_items : int number of data item structures contained in this MFDataStructure, including itself record_within_record : bool true if this MFDataStructure is a record within a container MFDataStructure file_data : bool true if data points to a file block_type : BlockType whether the block containing this data is a single non-repeating block, a multiple repeating block, or a transient repeating block block_variable : bool if true, this data is part of the block header model_data : bool if true, data is part of a model num_optional : int number of optional data items parent_block : MFBlockStructure parent block structure object data_item_structures : list list of data item structures contained in this MFDataStructure expected_data_items : dict dictionary of expected data item names for quick lookup shape : tuple shape of first data item Methods ------- get_keywords : () : list returns a list of all keywords associated with this data supports_aux : () : bool returns true of this data supports aux variables add_item : (item : MFDataItemStructure, record : bool) adds a data item to this MFDataStructure set_path : (path : tuple) sets the path describing the data's location within the simulation (<model>,<package>,<block>,<data>) get_datatype : () : DataType returns the DataType of this data (array, list, scalar, ...) get_min_record_entries : () : int gets the minimum number of entries, as entered in a package file, for a single record. excludes optional data items get_record_size : () : int gets the number of data items, excluding keyword data items, in this MFDataStructure all_keywords : () : bool returns true of all data items are keywords get_type_string : () : str returns descriptive string of the data types in this MFDataStructure get_description : () : str returns a description of the data get_type_array : (type_array : list): builds an array of data type information in type_array get_datum_type : (numpy_type : bool): returns the object type of the first data item in this MFDataStructure with a standard type. if numpy_type is true returns the type as a numpy type get_data_item_types: () : list returns a list of object type for every data item in this MFDataStructure first_non_keyword_index : () : int return the index of the first data item in this MFDataStructure that is not a keyword See Also -------- Notes ----- Examples -------- """ def __init__(self, data_item, model_data, package_type, dfn_list): self.type = data_item.type self.package_type = package_type self.path = None self.optional = data_item.optional self.name = data_item.name self.block_name = data_item.block_name self.name_length = len(self.name) self.is_aux = data_item.is_aux self.is_boundname = data_item.is_boundname self.name_list = data_item.name_list self.python_name = data_item.python_name self.longname = data_item.longname self.default_value = data_item.default_value self.repeating = False self.layered = ( "nlay" in data_item.shape or "nodes" in data_item.shape or len(data_item.layer_dims) > 1 ) self.num_data_items = len(data_item.data_items) self.record_within_record = False self.file_data = False self.nam_file_data = False self.block_type = data_item.block_type self.block_variable = data_item.block_variable self.model_data = model_data self.num_optional = 0 self.parent_block = None self._fpmerge_data_item(data_item, dfn_list) self.construct_package = data_item.construct_package self.construct_data = data_item.construct_data self.parameter_name = data_item.parameter_name self.one_per_pkg = data_item.one_per_pkg # self.data_item_structures_dict = {} self.data_item_structures = [] self.expected_data_items = {} self.shape = data_item.shape if ( self.type == DatumType.recarray or self.type == DatumType.record or self.type == DatumType.repeating_record ): # record expected data for later error checking for data_item_name in data_item.data_items: self.expected_data_items[data_item_name] = len( self.expected_data_items ) else: self.expected_data_items[data_item.name] = len( self.expected_data_items ) @property def basic_item(self): if not self.parent_block.parent_package.stress_package: return False for item in self.data_item_structures: if ( ( (item.repeating or item.optional) and not ( item.is_cellid or item.is_aux or item.is_boundname ) ) or item.jagged_array is not None or item.type == DatumType.keystring or item.type == DatumType.keyword or ( item.description is not None and "keyword `NONE'" in item.description ) ): return False return True @property def is_mname(self): for item in self.data_item_structures: if item.is_mname: return True return False
[docs] def get_item(self, item_name): for item in self.data_item_structures: if item.name.lower() == item_name.lower(): return item return None
[docs] def get_keywords(self): keywords = [] if ( self.type == DatumType.recarray or self.type == DatumType.record or self.type == DatumType.repeating_record ): for data_item_struct in self.data_item_structures: if data_item_struct.type == DatumType.keyword: if len(keywords) == 0: # create first keyword tuple for name in data_item_struct.name_list: keywords.append((name,)) else: # update all keyword tuples with latest keyword found new_keywords = [] for keyword_tuple in keywords: for name in data_item_struct.name_list: new_keywords.append(keyword_tuple + (name,)) if data_item_struct.optional: keywords = keywords + new_keywords else: keywords = new_keywords elif data_item_struct.type == DatumType.keystring: for keyword_item in data_item_struct.data_items: keywords.append((keyword_item,)) elif len(keywords) == 0: if len(data_item_struct.valid_values) > 0: new_keywords = [] # loop through all valid values and append to the end # of each keyword tuple for valid_value in data_item_struct.valid_values: if len(keywords) == 0: new_keywords.append((valid_value,)) else: for keyword_tuple in keywords: new_keywords.append( keyword_tuple + (valid_value,) ) keywords = new_keywords else: for name in data_item_struct.name_list: keywords.append((name,)) else: for name in self.name_list: keywords.append((name,)) return keywords
[docs] def supports_aux(self): for data_item_struct in self.data_item_structures: if data_item_struct.name.lower() == "aux": return True return False
[docs] def add_item(self, item, record=False, dfn_list=None): item_added = False if item.type != DatumType.recarray and ( ( item.type != DatumType.record and item.type != DatumType.repeating_record ) or record is True ): if item.name not in self.expected_data_items: raise StructException( 'Could not find data item "{}" in ' "expected data items of data structure " "{}.".format(item.name, self.name), self.path, ) item.set_path(self.path) if len(self.data_item_structures) == 0: self.keyword = item.name # insert data item into correct location in array location = self.expected_data_items[item.name] if len(self.data_item_structures) > location: # TODO: ask about this condition and remove if self.data_item_structures[location] is None: # verify that this is not a placeholder value if self.data_item_structures[location] is not None: raise StructException( 'Data structure "{}" already ' 'has the item named "{}"' ".".format(self.name, item.name), self.path, ) if isinstance(item, MFDataItemStructure): self.nam_file_data = ( self.nam_file_data or item.file_nam_in_nam_file() ) self.file_data = ( self.file_data or item.indicates_file_name() ) # replace placeholder value self.data_item_structures[location] = item item_added = True else: for index in range( 0, location - len(self.data_item_structures) ): # insert placeholder in array self.data_item_structures.append(None) if isinstance(item, MFDataItemStructure): self.nam_file_data = ( self.nam_file_data or item.file_nam_in_nam_file() ) self.file_data = ( self.file_data or item.indicates_file_name() ) self.data_item_structures.append(item) item_added = True self.optional = self.optional and item.optional if item.optional: self.num_optional += 1 if item_added: self._fpmerge_data_item(item, dfn_list) return item_added
def _fpmerge_data_item(self, item, dfn_list): mfstruct = MFStructure() # check for flopy-specific dfn data if item.name.lower() in mfstruct.flopy_dict: # read flopy-specific dfn data for name, value in mfstruct.flopy_dict[item.name.lower()].items(): line = f"{name} {value}" item.set_value(line, None) if dfn_list is not None: dfn_list[-1].append(line)
[docs] def set_path(self, path): self.path = path + (self.name,)
[docs] def get_datatype(self): if self.type == DatumType.recarray: if self.block_type != BlockType.single and not self.block_variable: if self.block_type == BlockType.transient: return DataType.list_transient else: return DataType.list_multiple else: return DataType.list if ( self.type == DatumType.record or self.type == DatumType.repeating_record ): record_size, repeating_data_item = self.get_record_size() if ( record_size >= 1 and not self.all_keywords() ) or repeating_data_item: if ( self.block_type != BlockType.single and not self.block_variable ): if self.block_type == BlockType.transient: return DataType.list_transient else: return DataType.list_multiple else: return DataType.list else: if ( self.block_type != BlockType.single and not self.block_variable ): return DataType.scalar_transient else: return DataType.scalar elif ( len(self.data_item_structures) > 0 and self.data_item_structures[0].repeating ): if self.data_item_structures[0].type == DatumType.string: return DataType.list else: if self.block_type == BlockType.single: return DataType.array else: return DataType.array_transient elif ( len(self.data_item_structures) > 0 and self.data_item_structures[0].type == DatumType.keyword ): if self.block_type != BlockType.single and not self.block_variable: return DataType.scalar_keyword_transient else: return DataType.scalar_keyword else: if self.block_type != BlockType.single and not self.block_variable: return DataType.scalar_transient else: return DataType.scalar
[docs] def is_mult_or_trans(self): data_type = self.get_datatype() if ( data_type == DataType.scalar_keyword_transient or data_type == DataType.array_transient or data_type == DataType.list_transient or data_type == DataType.list_multiple ): return True return False
[docs] def get_min_record_entries(self): count = 0 for data_item_structure in self.data_item_structures: if not data_item_structure.optional: if data_item_structure.type == DatumType.record: count += data_item_structure.get_record_size()[0] else: if data_item_structure.type != DatumType.keyword: count += 1 return count
[docs] def get_record_size(self): count = 0 repeating = False for data_item_structure in self.data_item_structures: if data_item_structure.type == DatumType.record: count += data_item_structure.get_record_size()[0] else: if data_item_structure.type != DatumType.keyword or count > 0: if data_item_structure.repeating: # count repeats as one extra record repeating = True count += 1 return count, repeating
[docs] def all_keywords(self): for data_item_structure in self.data_item_structures: if data_item_structure.type == DatumType.record: if not data_item_structure.all_keywords(): return False else: if data_item_structure.type != DatumType.keyword: return False return True
[docs] def get_type_string(self): type_array = [] self.get_docstring_type_array(type_array) type_string = ", ".join(type_array) type_header = "" type_footer = "" if ( len(self.data_item_structures) > 1 or self.data_item_structures[0].repeating ): type_header = "[" type_footer = "]" if self.repeating: type_footer = f"] ... [{type_string}]" return f"{type_header}{type_string}{type_footer}"
[docs] def get_docstring_type_array(self, type_array): for index, item in enumerate(self.data_item_structures): if item.type == DatumType.record: item.get_docstring_type_array(type_array) else: if self.display_item(index): if ( self.type == DatumType.recarray or self.type == DatumType.record or self.type == DatumType.repeating_record ): type_array.append(str(item.name)) else: type_array.append(str(self._resolve_item_type(item)))
[docs] def get_description( self, line_size=79, initial_indent=" ", level_indent=" " ): type_array = [] self.get_type_array(type_array) description = "" for datastr, index, itype in type_array: item = datastr.data_item_structures[index] if item is None: continue if item.type == DatumType.record: item_desc = item.get_description( line_size, initial_indent + level_indent, level_indent ) description = f"{description}\n{item_desc}" elif datastr.display_item(index): if len(description.strip()) > 0: description = f"{description}\n" item_desc = item.description if item.numeric_index or item.is_cellid: # append zero-based index text item_desc = f"{item_desc} {numeric_index_text}" item_desc = f"* {item.name} ({itype}) {item_desc}" twr = TextWrapper( width=line_size, initial_indent=initial_indent, subsequent_indent=f" {initial_indent}", ) item_desc = "\n".join(twr.wrap(item_desc)) description = f"{description}{item_desc}" if item.type == DatumType.keystring: keystr_desc = item.get_keystring_desc( line_size, initial_indent + level_indent, level_indent ) description = f"{description}\n{keystr_desc}" return description
[docs] def get_subpackage_description( self, line_size=79, initial_indent=" ", level_indent=" " ): item_desc = ( "* Contains data for the {} package. Data can be " "stored in a dictionary containing data for the {} " "package with variable names as keys and package data as " "values. Data just for the {} variable is also " "acceptable. See {} package documentation for more " "information" ".".format( self.construct_package, self.construct_package, self.parameter_name, self.construct_package, ) ) twr = TextWrapper( width=line_size, initial_indent=initial_indent, subsequent_indent=f" {initial_indent}", ) return "\n".join(twr.wrap(item_desc))
[docs] def get_doc_string( self, line_size=79, initial_indent=" ", level_indent=" " ): if self.parameter_name is not None: description = self.get_subpackage_description( line_size, initial_indent + level_indent, level_indent ) var_name = self.parameter_name type_name = f"{{varname:data}} or {self.construct_data} data" else: description = self.get_description( line_size, initial_indent + level_indent, level_indent ) var_name = self.python_name type_name = self.get_type_string() param_doc_string = f"{var_name} : {type_name}" twr = TextWrapper( width=line_size, initial_indent=initial_indent, subsequent_indent=f" {initial_indent}", ) param_doc_string = "\n".join(twr.wrap(param_doc_string)) param_doc_string = f"{param_doc_string}\n{description}" return param_doc_string
[docs] def get_type_array(self, type_array): for index, item in enumerate(self.data_item_structures): if item.type == DatumType.record: item.get_type_array(type_array) else: if self.display_item(index): type_array.append( ( self, index, str(self._resolve_item_type(item)), ) )
def _resolve_item_type(self, item): item_type = item.type_string first_nk_idx = self.first_non_keyword_index() # single keyword is type boolean if item_type == "keyword" and len(self.data_item_structures) == 1: item_type = "boolean" if item.is_cellid: item_type = "(integer, ...)" # two keywords if len(self.data_item_structures) == 2 and first_nk_idx is None: # keyword type is string item_type = "string" return item_type
[docs] def display_item(self, item_num): item = self.data_item_structures[item_num] first_nk_idx = self.first_non_keyword_index() # all keywords excluded if there is a non-keyword if not (item.type == DatumType.keyword and first_nk_idx is not None): # ignore first keyword if there are two keywords if ( len(self.data_item_structures) == 2 and first_nk_idx is None and item_num == 0 ): return False return True return False
[docs] def get_datum_type(self, numpy_type=False, return_enum_type=False): data_item_types = self.get_data_item_types() for var_type in data_item_types: if ( var_type[0] == DatumType.double_precision or var_type[0] == DatumType.integer or var_type[0] == DatumType.string ): if return_enum_type: return var_type[0] else: if numpy_type: if var_type[0] == DatumType.double_precision: return np.float64 elif var_type[0] == DatumType.integer: return np.int32 else: return object else: return var_type[2] return None
[docs] def get_data_item_types(self): data_item_types = [] for data_item in self.data_item_structures: if data_item.type == DatumType.record: # record within a record data_item_types += data_item.get_data_item_types() else: data_item_types.append( [data_item.type, data_item.type_string, data_item.type_obj] ) return data_item_types
[docs] def first_non_keyword_index(self): for data_item, index in zip( self.data_item_structures, range(0, len(self.data_item_structures)) ): if data_item.type != DatumType.keyword: return index return None
[docs] def get_model(self): if self.model_data: if len(self.path) >= 1: return self.path[0] return None
[docs] def get_package(self): if self.model_data: if len(self.path) >= 2: return self.path[1] else: if len(self.path) >= 1: return self.path[0] return ""
[docs]class MFBlockStructure: """ Defines the structure of a MF6 block. Parameters ---------- name : string block name path : tuple tuple that describes location of block within simulation (<model>, <package>, <block>) model_block : bool true if this block is part of a model Attributes ---------- name : string block name path : tuple tuple that describes location of block within simulation (<model>, <package>, <block>) model_block : bool true if this block is part of a model data_structures : dict dictionary of data items in this block, with the data item name as the key block_header_structure : list list of data items that are part of this block's "header" Methods ------- repeating() : bool Returns true if more than one instance of this block can appear in a MF6 package file add_dataset(dataset : MFDataStructure, block_header_dataset : bool) Adds dataset to this block, as a header dataset of block_header_dataset is true number_non_optional_data() : int Returns the number of non-optional non-header data structures in this block number_non_optional_block_header_data() : int Returns the number of non-optional block header data structures in this block get_data_structure(path : tuple) : MFDataStructure Returns the data structure in this block with name defined by path[0]. If name does not exist, returns None. get_all_recarrays() : list Returns all data non-header data structures in this block that are of type recarray See Also -------- Notes ----- Examples -------- """ def __init__(self, name, path, model_block, parent_package): # initialize self.data_structures = {} self.block_header_structure = [] self.name = name self.path = path + (self.name,) self.model_block = model_block self.parent_package = parent_package
[docs] def repeating(self): if len(self.block_header_structure) > 0: return True return False
[docs] def add_dataset(self, dataset): dataset.set_path(self.path) if dataset.block_variable: self.block_header_structure.append(dataset) else: self.data_structures[dataset.name] = dataset
[docs] def number_non_optional_data(self): num = 0 for key, data_structure in self.data_structures.items(): if not data_structure.optional: num += 1 return num
[docs] def number_non_optional_block_header_data(self): if ( len(self.block_header_structure) > 0 and not self.block_header_structure[0].optional ): return 1 else: return 0
[docs] def get_data_structure(self, path): if path[0] in self.data_structures: return self.data_structures[path[0]] else: return None
[docs] def get_all_recarrays(self): recarray_list = [] for ds_key, item in self.data_structures.items(): if item.type == DatumType.recarray: recarray_list.append(item) return recarray_list
[docs]class MFInputFileStructure: """ MODFLOW Input File Structure class. Loads file structure information for individual input file types. Parameters ---------- dfn_file : string the definition file used to define the structure of this input file path : tuple path defining the location of the container of this input file structure within the overall simulation structure common : bool is this the common dfn file model_file : bool this file belongs to a specific model type Attributes ---------- valid : bool simulation structure validity path : tuple path defining the location of this input file structure within the overall simulation structure read_as_arrays : bool if this input file structure is the READASARRAYS version of a package Methods ------- is_valid() : bool Checks all structures objects within the file for validity get_data_structure(path : string) Returns a data structure of it exists, otherwise returns None. Data structure type returned is based on the tuple/list "path" See Also -------- Notes ----- Examples -------- """ def __init__(self, dfn_file, path, common, model_file): # initialize self.valid = True self.file_type = dfn_file.package_type self.file_prefix = dfn_file.package_prefix self.dfn_type = dfn_file.dfn_type self.dfn_file_name = dfn_file.dfn_file_name self.description = "" self.path = path + (self.file_type,) self.model_file = model_file # file belongs to a specific model self.read_as_arrays = False self.blocks, self.header = dfn_file.get_block_structure_dict( self.path, common, model_file, self, ) self.has_packagedata = "packagedata" in self.blocks self.has_perioddata = "period" in self.blocks self.multi_package_support = "multi-package" in self.header self.stress_package = ( "package-type" in self.header and self.header["package-type"] == "stress-package" ) self.advanced_stress_package = ( "package-type" in self.header and self.header["package-type"] == "advanced-stress-package" ) self.dfn_list = dfn_file.dfn_list self.sub_package = self._sub_package()
[docs] def advanced_package(self): return self.has_packagedata and self.has_perioddata
def _sub_package(self): mfstruct = MFStructure() for value in mfstruct.flopy_dict.values(): if value is not None and "construct_package" in value: if self.file_type == value["construct_package"]: return True return False
[docs] def is_valid(self): valid = True for block in self.blocks: valid = valid and block.is_valid() return valid
[docs] def get_data_structure(self, path): if isinstance(path, tuple) or isinstance(path, list): if path[0] in self.blocks: return self.blocks[path[0]].get_data_structure(path[1:]) else: return None else: for block in self.blocks: if path in block.data_structures: return block.data_structures[path] return None
[docs]class MFModelStructure: """ Defines the structure of a MF6 model and its packages Parameters ---------- model_type : string abbreviation of model type Attributes ---------- valid : bool simulation structure validity name_file_struct_obj : MFInputFileStructure describes the structure of the simulation name file package_struct_objs : dict describes the structure of the simulation packages model_type : string dictionary containing simulation package structure Methods ------- add_namefile : (dfn_file : DfnFile, model_file=True : bool) Adds a namefile structure object to the model add_package(dfn_file : DfnFile, model_file=True : bool) Adds a package structure object to the model is_valid() : bool Checks all structures objects within the model for validity get_data_structure(path : string) Returns a data structure of it exists, otherwise returns None. Data structure type returned is based on the tuple/list "path" See Also -------- Notes ----- Examples -------- """ def __init__(self, model_type, utl_struct_objs): # add name file structure self.model_type = model_type self.name_file_struct_obj = None self.package_struct_objs = {} self.utl_struct_objs = utl_struct_objs
[docs] def add_namefile(self, dfn_file, common): self.name_file_struct_obj = MFInputFileStructure( dfn_file, (self.model_type,), common, True )
[docs] def add_package(self, dfn_file, common): self.package_struct_objs[dfn_file.package_type] = MFInputFileStructure( dfn_file, (self.model_type,), common, True )
[docs] def get_package_struct(self, package_type): if package_type in self.package_struct_objs: return self.package_struct_objs[package_type] elif package_type in self.utl_struct_objs: return self.utl_struct_objs[package_type] else: return None
[docs] def is_valid(self): valid = True for package_struct in self.package_struct_objs: valid = valid and package_struct.is_valid() return valid
[docs] def get_data_structure(self, path): if path[0] in self.package_struct_objs: if len(path) > 1: return self.package_struct_objs[path[0]].get_data_structure( path[1:] ) else: return self.package_struct_objs[path[0]] elif path[0] == "nam": if len(path) > 1: return self.name_file_struct_obj.get_data_structure(path[1:]) else: return self.name_file_struct_obj else: return None
[docs]class MFSimulationStructure: """ Defines the structure of a MF6 simulation and its packages and models. Parameters ---------- Attributes ---------- name_file_struct_obj : MFInputFileStructure describes the structure of the simulation name file package_struct_objs : dict describes the structure of the simulation packages model_struct_objs : dict describes the structure of the supported model types utl_struct_objs : dict describes the structure of the supported utility packages common : dict common file information model_type : string placeholder Methods ------- process_dfn : (dfn_file : DfnFile) reads in the contents of a dfn file, storing that contents in the appropriate object add_namefile : (dfn_file : DfnFile, model_file=True : bool) Adds a namefile structure object to the simulation add_util : (dfn_file : DfnFile) Adds a utility package structure object to the simulation add_package(dfn_file : DfnFile, model_file=True : bool) Adds a package structure object to the simulation store_common(dfn_file : DfnFile) Stores the contents of the common dfn file add_model(model_type : string) Adds a model structure object to the simulation is_valid() : bool Checks all structures objects within the simulation for validity get_data_structure(path : string) Returns a data structure of it exists, otherwise returns None. Data structure type returned is based on the tuple/list "path" tag_read_as_arrays Searches through all packages and tags any packages with a name that indicates they are the READASARRAYS version of a package. See Also -------- Notes ----- Examples -------- """ def __init__(self): # initialize self.name_file_struct_obj = None self.package_struct_objs = {} self.utl_struct_objs = {} self.model_struct_objs = {} self.common = None self.model_type = "" @property def model_types(self): model_type_list = [] for model in self.model_struct_objs.values(): model_type_list.append(model.model_type[:-1]) return model_type_list
[docs] def process_dfn(self, dfn_file): if dfn_file.dfn_type == DfnType.common: self.store_common(dfn_file) elif dfn_file.dfn_type == DfnType.sim_name_file: self.add_namefile(dfn_file, False) elif ( dfn_file.dfn_type == DfnType.sim_tdis_file or dfn_file.dfn_type == DfnType.exch_file or dfn_file.dfn_type == DfnType.ims_file ): self.add_package(dfn_file, False) elif dfn_file.dfn_type == DfnType.utl: self.add_util(dfn_file) elif ( dfn_file.dfn_type == DfnType.model_file or dfn_file.dfn_type == DfnType.model_name_file or dfn_file.dfn_type == DfnType.gnc_file or dfn_file.dfn_type == DfnType.mvr_file or dfn_file.dfn_type == DfnType.mvt_file ): model_ver = f"{dfn_file.model_type}{MFStructure(True).get_version_string()}" if model_ver not in self.model_struct_objs: self.add_model(model_ver) if dfn_file.dfn_type == DfnType.model_file: self.model_struct_objs[model_ver].add_package( dfn_file, self.common ) elif ( dfn_file.dfn_type == DfnType.gnc_file or dfn_file.dfn_type == DfnType.mvr_file or dfn_file.dfn_type == DfnType.mvt_file ): # gnc and mvr files belong both on the simulation and model # level self.model_struct_objs[model_ver].add_package( dfn_file, self.common ) self.add_package(dfn_file, False) else: self.model_struct_objs[model_ver].add_namefile( dfn_file, self.common )
[docs] def add_namefile(self, dfn_file, model_file=True): self.name_file_struct_obj = MFInputFileStructure( dfn_file, (), self.common, model_file )
[docs] def add_util(self, dfn_file): self.utl_struct_objs[dfn_file.package_type] = MFInputFileStructure( dfn_file, (), self.common, True )
[docs] def add_package(self, dfn_file, model_file=True): self.package_struct_objs[dfn_file.package_type] = MFInputFileStructure( dfn_file, (), self.common, model_file )
[docs] def store_common(self, dfn_file): # store common stuff self.common = dfn_file.dict_by_name()
[docs] def add_model(self, model_type): self.model_struct_objs[model_type] = MFModelStructure( model_type, self.utl_struct_objs )
[docs] def is_valid(self): valid = True for package_struct in self.package_struct_objs: valid = valid and package_struct.is_valid() for model_struct in self.model_struct_objs: valid = valid and model_struct.is_valid() return valid
[docs] def get_data_structure(self, path): if path[0] in self.package_struct_objs: if len(path) > 1: return self.package_struct_objs[path[0]].get_data_structure( path[1:] ) else: return self.package_struct_objs[path[0]] elif path[0] in self.model_struct_objs: if len(path) > 1: return self.model_struct_objs[path[0]].get_data_structure( path[1:] ) else: return self.model_struct_objs[path[0]] elif path[0] in self.utl_struct_objs: if len(path) > 1: return self.utl_struct_objs[path[0]].get_data_structure( path[1:] ) else: return self.utl_struct_objs[path[0]] elif path[0] == "nam": if len(path) > 1: return self.name_file_struct_obj.get_data_structure(path[1:]) else: return self.name_file_struct_obj else: return None
[docs] def tag_read_as_arrays(self): for key, package_struct in self.package_struct_objs.items(): if key[0:-1] in self.package_struct_objs and key[-1] == "a": package_struct.read_as_arrays = True for model_key, model_struct in self.model_struct_objs.items(): for ( key, package_struct, ) in model_struct.package_struct_objs.items(): if ( key[0:-1] in model_struct.package_struct_objs and key[-1] == "a" ): package_struct.read_as_arrays = True
[docs]class MFStructure: """ Singleton class for accessing the contents of the json structure file (only one instance of this class can exist, which loads the json file on initialization) Parameters ---------- mf_version : int version of MODFLOW valid : bool whether the structure information loaded from the dfn files is valid sim_struct : MFSimulationStructure Object containing file structure for all simulation files dimension_dict : dict Dictionary mapping paths to dimension information to the dataitem whose dimension information is being described """ _instance = None def __new__(cls, internal_request=False, load_from_dfn_files=False): if cls._instance is None: cls._instance = super().__new__(cls) # Initialize variables cls._instance.mf_version = 6 cls._instance.valid = True cls._instance.sim_struct = None cls._instance.dimension_dict = {} cls._instance.load_from_dfn_files = load_from_dfn_files cls._instance.flopy_dict = {} # Read metadata from file cls._instance.valid = cls._instance.__load_structure() elif not cls._instance.valid and not internal_request: if cls._instance.__load_structure(): cls._instance.valid = True return cls._instance
[docs] def get_version_string(self): return format(str(self.mf_version))
def __load_structure(self): # set up structure classes self.sim_struct = MFSimulationStructure() # initialize flopy dict keys MFStructure().flopy_dict["solution_packages"] = {} if self.load_from_dfn_files: mf_dfn = Dfn() dfn_files = mf_dfn.get_file_list() # get common common_dfn = DfnFile("common.dfn") self.sim_struct.process_dfn(common_dfn) # process each file's flopy header for file in dfn_files: dfn_path, tail = os.path.split(os.path.realpath(__file__)) dfn_path = os.path.join(dfn_path, "dfn") dfn_file = os.path.join(dfn_path, file) with open(dfn_file) as fd_dfn: for line in fd_dfn: if len(line) < 1 or line[0] != "#": break line_lst = line.strip().split() if len(line_lst) > 2 and line_lst[1] == "flopy": # load flopy data if ( line_lst[2] == "subpackage" and len(line_lst) == 7 ): sp_dict = { "construct_package": line_lst[4], "construct_data": line_lst[5], "parameter_name": line_lst[6], } MFStructure().flopy_dict[line_lst[3]] = sp_dict elif line_lst[2] == "solution_package": MFStructure().flopy_dict["solution_packages"][ line_lst[3] ] = line_lst[4:] if len(MFStructure().flopy_dict["solution_packages"]) == 0: MFStructure().flopy_dict["solution_packages"]["ims"] = ["*"] warnings.warn( "Package definition files (dfn) do not define a solution " "package. This can happen if your dfn files are out of " "sync. Auto-loaded default IMS solution package metadata." " In the future auto-loading default metadata will be " "deprecated.", DeprecationWarning, ) # process each file for file in dfn_files: self.sim_struct.process_dfn(DfnFile(file)) self.sim_struct.tag_read_as_arrays() else: package_list = PackageContainer.package_list() for package in package_list: # process header for entry in package.dfn[0][1:]: if ( isinstance(entry, list) and entry[0] == "solution_package" ): MFStructure().flopy_dict["solution_packages"][ package.package_abbr ] = entry[1:] # process each package self.sim_struct.process_dfn(DfnPackage(package)) self.sim_struct.tag_read_as_arrays() return True @staticmethod def __valid_line(line): if len(line.strip()) > 1 and line[0] != "#": return True return False