Source code for flopy.mf6.utils.mfsimlistfile

import os
import pathlib as pl
import re
import warnings

import numpy as np


[docs]class MfSimulationList: def __init__(self, file_name: os.PathLike): # Set up file reading if isinstance(file_name, str): file_name = pl.Path(file_name) if not file_name.is_file(): raise FileNotFoundError(f"file_name `{file_name}` not found") self.file_name = file_name self.f = open(file_name, "r", encoding="ascii", errors="replace") self.normal_termination = self._get_termination_message() self.memory_print_option = self._memory_print_option() @property def is_normal_termination(self) -> bool: """ Return boolean indicating if the simulation terminated normally Returns ------- success: bool Boolean indicating if the simulation terminated normally """ return self.normal_termination
[docs] def get_runtime( self, units: str = "seconds", simulation_timer: str = "elapsed" ) -> float: """ Get model runtimes from the simulation list file. Parameters ---------- units : str Units in which to return the timer. Acceptable values are 'seconds', 'minutes', 'hours' (default is 'seconds') simulation_timer : str Timer to return. Acceptable values are 'elapsed', 'formulate', 'solution' (default is 'elapsed') Returns ------- out : float Floating point value with the runtime in requested units. Returns NaN if runtime not found in list file """ UNITS = ( "seconds", "minutes", "hours", ) TIMERS = ( "elapsed", "formulate", "solution", ) TIMERS_DICT = { "elapsed": "Elapsed run time:", "formulate": "Total formulate time:", "solution": "Total solution time:", } simulation_timer = simulation_timer.lower() if simulation_timer not in TIMERS: msg = ( "simulation_timers input variable must be " + " ,".join(TIMERS) + f": {simulation_timer} was specified." ) raise ValueError(msg) units = units.lower() if units not in UNITS: msg = ( "units input variable must be " + ", ".join(UNITS) + f": {units} was specified." ) raise ValueError(msg) # rewind the file self._rewind_file() if simulation_timer == "elapsed": seekpoint = self._seek_to_string(TIMERS_DICT[simulation_timer]) self.f.seek(seekpoint) line = self.f.readline().strip() if line == "": return np.nan # parse floating point values from the Elapsed run time string times = list(map(float, re.findall(r"[+-]?[0-9.]+", line))) # pad an array with zeros and times with # [days, hours, minutes, seconds] times = np.array([0 for _ in range(4 - len(times))] + times) # convert all to seconds time2sec = np.array([24 * 60 * 60, 60 * 60, 60, 1]) times_sec = np.sum(times * time2sec) else: seekpoint = self._seek_to_string(TIMERS_DICT[simulation_timer]) self.f.seek(seekpoint) line = self.f.readline().strip() if line == "": return np.nan times_sec = float(line.split()[3]) # return time in the requested units if units == "seconds": return times_sec elif units == "minutes": return times_sec / 60.0 elif units == "hours": return times_sec / 60.0 / 60.0
[docs] def get_outer_iterations(self) -> int: """ Get the total outer iterations from the simulation list file. Parameters ---------- Returns ------- outer_iterations : float Sum of all TOTAL ITERATIONS found in the list file """ # initialize total_iterations outer_iterations = 0 # rewind the file self._rewind_file() while True: seekpoint = self._seek_to_string("CALLS TO NUMERICAL SOLUTION IN") self.f.seek(seekpoint) line = self.f.readline() if line == "": break outer_iterations += int(line.split()[0]) return outer_iterations
[docs] def get_total_iterations(self) -> int: """ Get the total number of iterations from the simulation list file. Parameters ---------- Returns ------- total_iterations : float Sum of all TOTAL ITERATIONS found in the list file """ # initialize total_iterations total_iterations = 0 # rewind the file self._rewind_file() while True: seekpoint = self._seek_to_string("TOTAL ITERATIONS") self.f.seek(seekpoint) line = self.f.readline() if line == "": break total_iterations += int(line.split()[0]) return total_iterations
[docs] def get_memory_usage( self, virtual: bool = False, units: str = "gigabytes", ) -> float: """ Get the simulation memory usage from the simulation list file. Parameters ---------- virtual : bool Return total or virtual memory usage (default is total) units : str Memory units for return results. Valid values are 'gigabytes', 'megabytes', 'kilobytes', and 'bytes' (default is 'gigabytes'). Returns ------- memory_usage : float Total memory usage for a simulation (in Gigabytes) """ # initialize memory_usage memory_usage = 0.0 # rewind the file self._rewind_file() tags = ( "MEMORY MANAGER TOTAL STORAGE BY DATA TYPE", "Total", "Virtual", ) while True: seekpoint = self._seek_to_string(tags[0]) self.f.seek(seekpoint) line = self.f.readline().strip() if line == "": break sim_units = line.split()[-1] unit_conversion = self._get_memory_unit_conversion( sim_units, return_units_str=units.upper(), ) if virtual: tag = tags[2] else: tag = tags[1] seekpoint = self._seek_to_string(tag) self.f.seek(seekpoint) line = self.f.readline() if line == "": break memory_usage = float(line.split()[-1]) * unit_conversion return memory_usage
[docs] def get_non_virtual_memory_usage(self): """ Returns ------- non_virtual: float Non-virtual memory usage, which is the difference between the total and virtual memory usage """ return self.get_memory_usage() - self.get_memory_usage(virtual=True)
[docs] def get_memory_summary(self, units: str = "gigabytes") -> dict: """ Get the summary memory information if it is available in the simulation list file. Summary memory information is only available if the memory_print_option is set to 'summary' in the simulation name file options block. Parameters ---------- units : str Memory units for return results. Valid values are 'gigabytes', 'megabytes', 'kilobytes', and 'bytes' (default is 'gigabytes'). Returns ------- memory_summary : dict dictionary with the total memory for each simulation component. None is returned if summary memory data is not present in the simulation listing file. """ # initialize the return variable memory_summary = None if self.memory_print_option != "summary": msg = ( "Cannot retrieve memory data using get_memory_summary() " + "since memory_print_option is not set to 'SUMMARY'. " + "Returning None." ) warnings.warn(msg, category=Warning) else: # rewind the file self._rewind_file() seekpoint = self._seek_to_string( "SUMMARY INFORMATION ON VARIABLES " + "STORED IN THE MEMORY MANAGER" ) self.f.seek(seekpoint) line = self.f.readline().strip() if line != "": sim_units = line.split()[-1] unit_conversion = self._get_memory_unit_conversion( sim_units, return_units_str=units.upper(), ) # read the header for k in range(3): _ = self.f.readline() terminator = 100 * "-" memory_summary = {} while True: line = self.f.readline().strip() if line == terminator: break data = line.split() memory_summary[data[0]] = float(data[-1]) * unit_conversion return memory_summary
[docs] def get_memory_all(self, units: str = "gigabytes") -> dict: """ Get a dictionary of the memory table written if it is available in the simulation list file. The memory table is only available if the memory_print_option is set to 'all' in the simulation name file options block. Parameters ---------- units : str Memory units for return results. Valid values are 'gigabytes', 'megabytes', 'kilobytes', and 'bytes' (default is 'gigabytes'). Returns ------- memory_all : dict dictionary with the memory information for each variable in the MODFLOW 6 memory manager. The dictionary keys are the full memory path for a variable (the memory path and variable name). The dictionary entry for each key includes the memory path, the variable name, data type, size, and memory used for each variable. None is returned if the memory table is not present in the simulation listing file. """ # initialize the return variable memory_all = None TYPE_SIZE = { "INTEGER": 4.0, "DOUBLE": 8.0, "LOGICAL": 4.0, "STRING": 1.0, } if self.memory_print_option != "all": msg = ( "Cannot retrieve memory data using get_memory_all() since " + "memory_print_option is not set to 'ALL'. Returning None." ) warnings.warn(msg, category=Warning) else: # rewind the file self._rewind_file() seekpoint = self._seek_to_string( "DETAILED INFORMATION ON VARIABLES " + "STORED IN THE MEMORY MANAGER" ) self.f.seek(seekpoint) line = self.f.readline().strip() if line != "": sim_units = "BYTES" unit_conversion = self._get_memory_unit_conversion( sim_units, return_units_str=units.upper(), ) # read the header for k in range(3): _ = self.f.readline() terminator = 173 * "-" memory_all = {} # read the data while True: line = self.f.readline().strip() if line == terminator: break if "STRING LEN=" in line: mempath = line[0:50].strip() varname = line[51:67].strip() data_type = line[68:84].strip() no_items = float(line[84:105].strip()) assoc_var = line[106:].strip() variable_bytes = ( TYPE_SIZE["STRING"] * float(data_type.replace("STRING LEN=", "")) * no_items ) else: data = line.split() mempath = data[0] varname = data[1] data_type = data[2] no_items = float(data[3]) assoc_var = data[4] variable_bytes = TYPE_SIZE[data_type] * no_items if assoc_var == "--": size_bytes = variable_bytes * unit_conversion memory_all[f"{mempath}/{varname}"] = { "MEMPATH": mempath, "VARIABLE": varname, "DATATYPE": data_type, "SIZE": no_items, "MEMORYSIZE": size_bytes, } return memory_all
def _seek_to_string(self, s): """ Parameters ---------- s : str Seek through the file to the next occurrence of s. Return the seek location when found. Returns ------- seekpoint : int Next location of the string """ while True: seekpoint = self.f.tell() line = self.f.readline() if line == "": break if s in line: break return seekpoint def _rewind_file(self): """ Rewind the simulation list file Returns ------- """ self.f.seek(0) def _get_termination_message(self) -> bool: """ Determine if the simulation terminated normally Returns ------- success: bool Boolean indicating if the simulation terminated normally """ # rewind the file self._rewind_file() seekpoint = self._seek_to_string("Normal termination of simulation.") self.f.seek(seekpoint) line = self.f.readline().strip() if line == "": success = False else: success = True return success def _get_memory_unit_conversion( self, sim_units_str: str, return_units_str, ) -> float: """ Calculate memory unit conversion factor that converts from reported units to gigabytes Parameters ---------- sim_units_str : str Memory Units in the simulation listing file. Valid values are 'GIGABYTES', 'MEGABYTES', 'KILOBYTES', or 'BYTES'. Returns ------- unit_conversion : float Unit conversion factor """ valid_units = ( "GIGABYTES", "MEGABYTES", "KILOBYTES", "BYTES", ) if sim_units_str not in valid_units: raise ValueError(f"Unknown memory unit '{sim_units_str}'") factor = [1.0, 1e-3, 1e-6, 1e-9] if return_units_str == "MEGABYTES": factor = [v * 1e3 for v in factor] elif return_units_str == "KILOBYTES": factor = [v * 1e6 for v in factor] elif return_units_str == "BYTES": factor = [v * 1e9 for v in factor] factor_dict = {tag: factor[idx] for idx, tag in enumerate(valid_units)} return factor_dict[sim_units_str] def _memory_print_option(self) -> str: """ Determine the memory print option selected Returns ------- option: str memory_print_option ('summary', 'all', or None) """ # rewind the file self._rewind_file() seekpoint = self._seek_to_string("MEMORY_PRINT_OPTION SET TO") self.f.seek(seekpoint) line = self.f.readline().strip() if line == "": option = None else: option_list = re.findall(r'"([^"]*)"', line) if len(option_list) < 1: raise LookupError( "could not parse memory_print_option from" + f"'{line}'" ) option = option_list[-1].lower() if option not in ("all", "summary"): raise ValueError(f"unknown memory print option {option}") return option