Source code for flopy.utils.compare

import os
import textwrap
from typing import List, Optional, Union

import numpy as np

from flopy.modflow import ModflowOc
from flopy.utils import FormattedHeadFile, HeadFile, HeadUFile
from flopy.utils.mfreadnam import get_entries_from_namefile


def _diffmax(v1, v2):
    """Calculate the maximum difference between two vectors.

    Parameters
    ----------
    v1 : numpy.ndarray
        array of base model results
    v2 : numpy.ndarray
        array of comparison model results

    Returns
    -------
    diffmax : float
        absolute value of the maximum difference in v1 and v2 array values
    indices : numpy.ndarry
        indices where the absolute value of the difference is equal to the
        absolute value of the maximum difference.

    """
    if v1.ndim > 1 or v2.ndim > 1:
        v1 = v1.flatten()
        v2 = v2.flatten()
    if v1.size != v2.size:
        err = (
            f"Error: calculate_difference v1 size ({v1.size}) "
            + f"is not equal to v2 size ({v2.size})"
        )
        raise Exception(err)

    diff = abs(v1 - v2)
    diffmax = diff.max()
    return diffmax, np.where(diff == diffmax)


def _difftol(v1, v2, tol):
    """Calculate the difference between two arrays relative to a tolerance.

    Parameters
    ----------
    v1 : numpy.ndarray
        array of base model results
    v2 : numpy.ndarray
        array of comparison model results
    tol : float
        tolerance used to evaluate base and comparison models

    Returns
    -------
    diffmax : float
        absolute value of the maximum difference in v1 and v2 array values
    indices : numpy.ndarry
        indices where the absolute value of the difference exceed the
        specified tolerance.

    """
    if v1.ndim > 1 or v2.ndim > 1:
        v1 = v1.flatten()
        v2 = v2.flatten()
    if v1.size != v2.size:
        err = (
            f"Error: calculate_difference v1 size ({v1.size}) "
            + f"is not equal to v2 size ({v2.size})"
        )
        raise Exception(err)

    diff = abs(v1 - v2)
    return diff.max(), np.where(diff > tol)


[docs]def compare_budget( namefile1: Optional[Union[str, os.PathLike]], namefile2: Optional[Union[str, os.PathLike]], max_cumpd=0.01, max_incpd=0.01, outfile: Optional[Union[str, os.PathLike]] = None, files1: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, files2: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, ): """Compare the budget results from two simulations. Parameters ---------- namefile1 : str or PathLike, optional namefile path for base model namefile2 : str or PathLike, optional namefile path for comparison model max_cumpd : float maximum percent discrepancy allowed for cumulative budget terms (default is 0.01) max_incpd : float maximum percent discrepancy allowed for incremental budget terms (default is 0.01) outfile : str or PathLike, optional budget comparison output file name. If outfile is None, no comparison output is saved. (default is None) files1 : str, PathLike, or list, optional base model output file. If files1 is not None, results will be extracted from files1 and namefile1 will not be used. (default is None) files2 : str, PathLike, or list, optional comparison model output file. If files2 is not None, results will be extracted from files2 and namefile2 will not be used. (default is None) Returns ------- success : bool boolean indicating if the difference between budgets are less than max_cumpd and max_incpd """ try: import flopy except: msg = "flopy not available - cannot use compare_budget" raise ValueError(msg) # headers headers = ("INCREMENTAL", "CUMULATIVE") direction = ("IN", "OUT") # Get name of list files lst_file1 = None if files1 is None: lst_file = get_entries_from_namefile(namefile1, "list") lst_file1 = lst_file[0][0] if any(lst_file) else None else: if isinstance(files1, (str, os.PathLike)): files1 = [files1] for file in files1: if ( "list" in os.path.basename(file).lower() or "lst" in os.path.basename(file).lower() ): lst_file1 = file break lst_file2 = None if files2 is None: lst_file = get_entries_from_namefile(namefile2, "list") lst_file2 = lst_file[0][0] if any(lst_file) else None else: if isinstance(files2, (str, os.PathLike)): files2 = [files2] for file in files2: if ( "list" in os.path.basename(file).lower() or "lst" in os.path.basename(file).lower() ): lst_file2 = file break # Determine if there are two files to compare if lst_file1 is None or lst_file2 is None: print("lst_file1 or lst_file2 is None") print(f"lst_file1: {lst_file1}") print(f"lst_file2: {lst_file2}") return True # Open output file if outfile is not None: f = open(outfile, "w") # Initialize SWR budget objects lst1obj = flopy.utils.MfusgListBudget(lst_file1) lst2obj = flopy.utils.MfusgListBudget(lst_file2) # Determine if there any SWR entries in the budget file if not lst1obj.isvalid() or not lst2obj.isvalid(): return True # Get numpy budget tables for lst_file1 lst1 = [] lst1.append(lst1obj.get_incremental()) lst1.append(lst1obj.get_cumulative()) # Get numpy budget tables for lst_file2 lst2 = [] lst2.append(lst2obj.get_incremental()) lst2.append(lst2obj.get_cumulative()) icnt = 0 v0 = np.zeros(2, dtype=float) v1 = np.zeros(2, dtype=float) err = np.zeros(2, dtype=float) # Process cumulative and incremental for idx in range(2): if idx > 0: max_pd = max_cumpd else: max_pd = max_incpd kper = lst1[idx]["stress_period"] kstp = lst1[idx]["time_step"] # Process each time step for jdx in range(kper.shape[0]): err[:] = 0.0 t0 = lst1[idx][jdx] t1 = lst2[idx][jdx] if outfile is not None: maxcolname = 0 for colname in t0.dtype.names: maxcolname = max(maxcolname, len(colname)) s = 2 * "\n" s += ( f"STRESS PERIOD: {kper[jdx] + 1} " + f"TIME STEP: {kstp[jdx] + 1}" ) f.write(s) if idx == 0: f.write("\nINCREMENTAL BUDGET\n") else: f.write("\nCUMULATIVE BUDGET\n") for i, colname in enumerate(t0.dtype.names): if i == 0: s = ( f"{'Budget Entry':<21} {'Model 1':>21} " + f"{'Model 2':>21} {'Difference':>21}\n" ) f.write(s) s = 87 * "-" + "\n" f.write(s) diff = t0[colname] - t1[colname] s = ( f"{colname:<21} {t0[colname]:>21} " + f"{t1[colname]:>21} {diff:>21}\n" ) f.write(s) v0[0] = t0["TOTAL_IN"] v1[0] = t1["TOTAL_IN"] if v0[0] > 0.0: err[0] = 100.0 * (v1[0] - v0[0]) / v0[0] v0[1] = t0["TOTAL_OUT"] v1[1] = t1["TOTAL_OUT"] if v0[1] > 0.0: err[1] = 100.0 * (v1[1] - v0[1]) / v0[1] for kdx, t in enumerate(err): if abs(t) > max_pd: icnt += 1 if outfile is not None: e = ( f'"{headers[idx]} {direction[kdx]}" ' + f"percent difference ({t})" + f" for stress period {kper[jdx] + 1} " + f"and time step {kstp[jdx] + 1} > {max_pd}." + f" Reference value = {v0[kdx]}. " + f"Simulated value = {v1[kdx]}." ) e = textwrap.fill( e, width=70, initial_indent=" ", subsequent_indent=" ", ) f.write(f"{e}\n") f.write("\n") # Close output file if outfile is not None: f.close() # test for failure success = True if icnt > 0: success = False return success
[docs]def compare_swrbudget( namefile1: Optional[Union[str, os.PathLike]], namefile2: Optional[Union[str, os.PathLike]], max_cumpd=0.01, max_incpd=0.01, outfile: Optional[Union[str, os.PathLike]] = None, files1: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, files2: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, ): """Compare the SWR budget results from two simulations. Parameters ---------- namefile1 : str or PathLike, optional namefile path for base model namefile2 : str or PathLike, optional namefile path for comparison model max_cumpd : float maximum percent discrepancy allowed for cumulative budget terms (default is 0.01) max_incpd : float maximum percent discrepancy allowed for incremental budget terms (default is 0.01) outfile : str or PathLike, optional budget comparison output file name. If outfile is None, no comparison output is saved. (default is None) files1 : str, PathLike, or list, optional base model output file. If files1 is not None, results will be extracted from files1 and namefile1 will not be used. (default is None) files2 : str, PathLike, or list, optional comparison model output file. If files2 is not None, results will be extracted from files2 and namefile2 will not be used. (default is None) Returns ------- success : bool boolean indicating if the difference between budgets are less than max_cumpd and max_incpd """ try: import flopy except: msg = "flopy not available - cannot use compare_swrbudget" raise ValueError(msg) # headers headers = ("INCREMENTAL", "CUMULATIVE") direction = ("IN", "OUT") # Get name of list files list1 = None if files1 is None: lst = get_entries_from_namefile(namefile1, "list") list1 = lst[0][0] if any(lst) else None else: for file in files1: if ( "list" in os.path.basename(file).lower() or "lst" in os.path.basename(file).lower() ): list1 = file break list2 = None if files2 is None: lst = get_entries_from_namefile(namefile2, "list") list2 = lst[0][0] if any(lst) else None else: for file in files2: if ( "list" in os.path.basename(file).lower() or "lst" in os.path.basename(file).lower() ): list2 = file break # Determine if there are two files to compare if list1 is None or list2 is None: return True # Initialize SWR budget objects lst1obj = flopy.utils.SwrListBudget(list1) lst2obj = flopy.utils.SwrListBudget(list2) # Determine if there any SWR entries in the budget file if not lst1obj.isvalid() or not lst2obj.isvalid(): return True # Get numpy budget tables for list1 lst1 = [] lst1.append(lst1obj.get_incremental()) lst1.append(lst1obj.get_cumulative()) # Get numpy budget tables for list2 lst2 = [] lst2.append(lst2obj.get_incremental()) lst2.append(lst2obj.get_cumulative()) icnt = 0 v0 = np.zeros(2, dtype=float) v1 = np.zeros(2, dtype=float) err = np.zeros(2, dtype=float) # Open output file if outfile is not None: f = open(outfile, "w") # Process cumulative and incremental for idx in range(2): if idx > 0: max_pd = max_cumpd else: max_pd = max_incpd kper = lst1[idx]["stress_period"] kstp = lst1[idx]["time_step"] # Process each time step for jdx in range(kper.shape[0]): err[:] = 0.0 t0 = lst1[idx][jdx] t1 = lst2[idx][jdx] if outfile is not None: maxcolname = 0 for colname in t0.dtype.names: maxcolname = max(maxcolname, len(colname)) s = 2 * "\n" s += ( f"STRESS PERIOD: {kper[jdx] + 1} " + f"TIME STEP: {kstp[jdx] + 1}" ) f.write(s) if idx == 0: f.write("\nINCREMENTAL BUDGET\n") else: f.write("\nCUMULATIVE BUDGET\n") for i, colname in enumerate(t0.dtype.names): if i == 0: s = ( f"{'Budget Entry':<21} {'Model 1':>21} " + f"{'Model 2':>21} {'Difference':>21}\n" ) f.write(s) s = 87 * "-" + "\n" f.write(s) diff = t0[colname] - t1[colname] s = ( f"{colname:<21} {t0[colname]:>21} " + f"{t1[colname]:>21} {diff:>21}\n" ) f.write(s) v0[0] = t0["TOTAL_IN"] v1[0] = t1["TOTAL_IN"] if v0[0] > 0.0: err[0] = 100.0 * (v1[0] - v0[0]) / v0[0] v0[1] = t0["TOTAL_OUT"] v1[1] = t1["TOTAL_OUT"] if v0[1] > 0.0: err[1] = 100.0 * (v1[1] - v0[1]) / v0[1] for kdx, t in enumerate(err): if abs(t) > max_pd: icnt += 1 e = ( f'"{headers[idx]} {direction[kdx]}" ' + f"percent difference ({t})" + f" for stress period {kper[jdx] + 1} " + f"and time step {kstp[jdx] + 1} > {max_pd}." + f" Reference value = {v0[kdx]}. " + f"Simulated value = {v1[kdx]}." ) e = textwrap.fill( e, width=70, initial_indent=" ", subsequent_indent=" ", ) f.write(f"{e}\n") f.write("\n") # Close output file if outfile is not None: f.close() # test for failure success = True if icnt > 0: success = False return success
[docs]def compare_heads( namefile1: Optional[Union[str, os.PathLike]], namefile2: Optional[Union[str, os.PathLike]], precision="auto", text="head", text2=None, htol=0.001, outfile: Optional[Union[str, os.PathLike]] = None, files1: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, files2: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, difftol=False, verbose=False, exfile: Optional[Union[str, os.PathLike]] = None, exarr=None, maxerr=None, ): """Compare the head results from two simulations. Parameters ---------- namefile1 : str or PathLike namefile path for base model namefile2 : str or PathLike namefile path for comparison model precision : str precision for binary head file ("auto", "single", or "double") default is "auto" htol : float maximum allowed head difference (default is 0.001) outfile : str or PathLike head comparison output file name. If outfile is None, no comparison output is saved. (default is None) files1 : str or PathLike, or List of str or PathLike base model output files. If files1 is not None, results will be extracted from files1 and namefile1 will not be used. (default is None) files2 : str or PathLike, or List of str or PathLike comparison model output files. If files2 is not None, results will be extracted from files2 and namefile2 will not be used. (default is None) difftol : bool boolean determining if the absolute value of the head difference greater than htol should be evaluated (default is False) verbose : bool boolean indicating if verbose output should be written to the terminal (default is False) exfile : str or PathLike, optional path to a file with exclusion array data. Head differences will not be evaluated where exclusion array values are greater than zero. (default is None) exarr : numpy.ndarry exclusion array. Head differences will not be evaluated where exclusion array values are greater than zero. (default is None). maxerr : int maximum number of head difference greater than htol that should be reported. If maxerr is None, all head difference greater than htol will be reported. (default is None) Returns ------- success : bool boolean indicating if the head differences are less than htol. """ if text2 is None: text2 = text dbs = "DATA(BINARY)" # Get head info for namefile1 hfpth1 = None status1 = dbs if files1 is None: # Get oc info, and return if OC not included in models ocf1 = get_entries_from_namefile(namefile1, "OC") if any(ocf1) is not None: return True hu1, hfpth1, du1, _ = ModflowOc.get_ocoutput_units(ocf1[0][0]) if text.lower() == "head": iut = hu1 elif text.lower() == "drawdown": iut = du1 if iut != 0: entries = get_entries_from_namefile(namefile1, unit=abs(iut)) hfpth1 = entries[0][0] if any(entries) else None status1 = entries[0][1] if any(entries) else None else: if isinstance(files1, (str, os.PathLike)): files1 = [files1] for file in files1: if text.lower() == "head": if ( "hds" in os.path.basename(file).lower() or "hed" in os.path.basename(file).lower() ): hfpth1 = file break elif text.lower() == "drawdown": if "ddn" in os.path.basename(file).lower(): hfpth1 = file break elif text.lower() == "concentration": if "ucn" in os.path.basename(file).lower(): hfpth1 = file break else: hfpth1 = file break # Get head info for namefile2 hfpth2 = None status2 = dbs if files2 is None: # Get oc info, and return if OC not included in models ocf2 = get_entries_from_namefile(namefile2, "OC") if not any(ocf2): return True hu2, hfpth2, du2, dfpth2 = ModflowOc.get_ocoutput_units(ocf2[0][0]) if text.lower() == "head": iut = hu2 elif text.lower() == "drawdown": iut = du2 if iut != 0: entries = get_entries_from_namefile(namefile2, unit=abs(iut)) hfpth2 = entries[0][0] if any(entries) else None status2 = entries[0][1] if any(entries) else None else: if isinstance(files2, (str, os.PathLike)): files2 = [files2] for file in files2: if text2.lower() == "head": if ( "hds" in os.path.basename(file).lower() or "hed" in os.path.basename(file).lower() ): hfpth2 = file break elif text2.lower() == "drawdown": if "ddn" in os.path.basename(file).lower(): hfpth2 = file break elif text2.lower() == "concentration": if "ucn" in os.path.basename(file).lower(): hfpth2 = file break else: hfpth2 = file break # confirm that there are two files to compare if hfpth1 is None or hfpth2 is None: print("hfpth1 or hfpth2 is None") print(f"hfpth1: {hfpth1}") print(f"hfpth2: {hfpth2}") return True # make sure the file paths exist if not os.path.isfile(hfpth1) or not os.path.isfile(hfpth2): print("hfpth1 or hfpth2 is not a file") print(f"hfpth1 isfile: {os.path.isfile(hfpth1)}") print(f"hfpth2 isfile: {os.path.isfile(hfpth2)}") return False # Open output file if outfile is not None: f = open(outfile, "w") f.write(f"Performing {text.upper()} to {text2.upper()} comparison\n") if exfile is not None: f.write(f"Using exclusion file {exfile}\n") if exarr is not None: f.write("Using exclusion array\n") msg = f"{hfpth1} is a " if status1 == dbs: msg += "binary file." else: msg += "ascii file." f.write(msg + "\n") msg = f"{hfpth2} is a " if status2 == dbs: msg += "binary file." else: msg += "ascii file." f.write(msg + "\n") # Process exclusion data exd = None # get data from exclusion file if exfile is not None: e = None if isinstance(exfile, (str, os.PathLike)): try: exd = np.genfromtxt(exfile).flatten() except: e = ( "Could not read exclusion " + f"file {os.path.basename(exfile)}" ) print(e) return False else: e = "exfile is not a valid file path" print(e) return False # process exclusion array if exarr is not None: e = None if isinstance(exarr, np.ndarray): if exd is None: exd = exarr.flatten() else: exd += exarr.flatten() else: e = "exarr is not a numpy array" print(e) return False # Get head objects status1 = status1.upper() unstructured1 = False if status1 == dbs: headobj1 = HeadFile( hfpth1, precision=precision, verbose=verbose, text=text ) txt = headobj1.recordarray["text"][0] if isinstance(txt, bytes): txt = txt.decode("utf-8") if "HEADU" in txt: unstructured1 = True headobj1 = HeadUFile(hfpth1, precision=precision, verbose=verbose) else: headobj1 = FormattedHeadFile(hfpth1, verbose=verbose, text=text) status2 = status2.upper() unstructured2 = False if status2 == dbs: headobj2 = HeadFile( hfpth2, precision=precision, verbose=verbose, text=text2 ) txt = headobj2.recordarray["text"][0] if isinstance(txt, bytes): txt = txt.decode("utf-8") if "HEADU" in txt: unstructured2 = True headobj2 = HeadUFile(hfpth2, precision=precision, verbose=verbose) else: headobj2 = FormattedHeadFile(hfpth2, verbose=verbose, text=text2) # get times times1 = headobj1.get_times() times2 = headobj2.get_times() for t1, t2 in zip(times1, times2): if not np.allclose([t1], [t2]): msg = "times in two head files are not " + f"equal ({t1},{t2})" raise ValueError(msg) kstpkper = headobj1.get_kstpkper() line_separator = 15 * "-" header = ( f"{' ':>15s} {' ':>15s} {'MAXIMUM':>15s} {'EXCEEDS':>15s}\n" + f"{'STRESS PERIOD':>15s} {'TIME STEP':>15s} " + f"{'HEAD DIFFERENCE':>15s} {'CRITERIA':>15s}\n" + f"{line_separator:>15s} {line_separator:>15s} " + f"{line_separator:>15s} {line_separator:>15s}\n" ) if verbose: print(f"Comparing results for {len(times1)} times") icnt = 0 # Process cumulative and incremental for idx, (t1, t2) in enumerate(zip(times1, times2)): h1 = headobj1.get_data(totim=t1) if unstructured1: temp = np.array([]) for a in h1: temp = np.hstack((temp, a)) h1 = temp h2 = headobj2.get_data(totim=t2) if unstructured2: temp = np.array([]) for a in h2: temp = np.hstack((temp, a)) h2 = temp if exd is not None: # reshape exd to the shape of the head arrays if idx == 0: e = ( f"shape of exclusion data ({exd.shape})" + "can not be reshaped to the size of the " + f"head arrays ({h1.shape})" ) if h1.flatten().shape != exd.shape: raise ValueError(e) exd = exd.reshape(h1.shape) iexd = exd > 0 # reset h1 and h2 to the same value in the excluded area h1[iexd] = 0.0 h2[iexd] = 0.0 if difftol: diffmax, indices = _difftol(h1, h2, htol) else: diffmax, indices = _diffmax(h1, h2) if outfile is not None: if idx < 1: f.write(header) if diffmax > htol: sexceed = "*" else: sexceed = "" kk1 = kstpkper[idx][1] + 1 kk0 = kstpkper[idx][0] + 1 f.write(f"{kk1:15d} {kk0:15d} {diffmax:15.6g} {sexceed:15s}\n") if diffmax >= htol: icnt += 1 if outfile is not None: if difftol: ee = ( "Maximum absolute head difference " + f"({diffmax}) -- " + f"{htol} tolerance exceeded at " + f"{indices[0].shape[0]} node location(s)" ) else: ee = ( "Maximum absolute head difference " + f"({diffmax}) exceeded " + f"at {indices[0].shape[0]} node location(s)" ) e = textwrap.fill( ee + ":", width=70, initial_indent=" ", subsequent_indent=" ", ) if verbose: f.write(f"{ee}\n") print(ee + f" at time {t1}") e = "" ncells = h1.flatten().shape[0] fmtn = "{:" + f"{len(str(ncells))}" + "d}" for itupe in indices: for jdx, ind in enumerate(itupe): iv = np.unravel_index(ind, h1.shape) iv = tuple(i + 1 for i in iv) v1 = h1.flatten()[ind] v2 = h2.flatten()[ind] d12 = v1 - v2 # e += ' ' + fmtn.format(jdx + 1) + ' node: ' # e += fmtn.format(ind + 1) # convert to one-based e += " " + fmtn.format(jdx + 1) e += f" {iv}" e += " -- " e += f"h1: {v1:20} " e += f"h2: {v2:20} " e += f"diff: {d12:20}\n" if isinstance(maxerr, int): if jdx + 1 >= maxerr: break if verbose: f.write(f"{e}\n") # Write header again, unless it is the last record if verbose: if idx + 1 < len(times1): f.write(f"\n{header}") # Close output file if outfile is not None: f.close() # test for failure success = True if icnt > 0: success = False return success
[docs]def compare_concentrations( namefile1: Union[str, os.PathLike], namefile2: Union[str, os.PathLike], precision="auto", ctol=0.001, outfile: Optional[Union[str, os.PathLike]] = None, files1: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, files2: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, difftol=False, verbose=False, ): """Compare the mt3dms and mt3dusgs concentration results from two simulations. Parameters ---------- namefile1 : str or PathLike namefile path for base model namefile2 : str or PathLike namefile path for comparison model precision : str precision for binary head file ("auto", "single", or "double") default is "auto" ctol : float maximum allowed concentration difference (default is 0.001) outfile : str or PathLike, optional concentration comparison output file name. If outfile is None, no comparison output is saved. (default is None) files1 : str, PathLike, or list, optional base model output file. If files1 is not None, results will be extracted from files1 and namefile1 will not be used. (default is None) files2 : str, PathLike, or list, optional comparison model output file. If files2 is not None, results will be extracted from files2 and namefile2 will not be used. (default is None) difftol : bool boolean determining if the absolute value of the concentration difference greater than ctol should be evaluated (default is False) verbose : bool boolean indicating if verbose output should be written to the terminal (default is False) Returns ------- success : bool boolean indicating if the concentration differences are less than ctol. Returns ------- """ try: import flopy except: msg = "flopy not available - cannot use compare_concs" raise ValueError(msg) # list of valid extensions valid_ext = ["ucn"] # Get info for first ucn file ufpth1 = None if files1 is None: for ext in valid_ext: ucn = get_entries_from_namefile(namefile1, extension=ext) ufpth = ucn[0][0] if any(ucn) else None if ufpth is not None: ufpth1 = ufpth break if ufpth1 is None: ufpth1 = os.path.join(os.path.dirname(namefile1), "MT3D001.UCN") else: if isinstance(files1, (str, os.PathLike)): files1 = [files1] for file in files1: for ext in valid_ext: if ext in os.path.basename(file).lower(): ufpth1 = file break # Get info for second ucn file ufpth2 = None if files2 is None: for ext in valid_ext: ucn = get_entries_from_namefile(namefile2, extension=ext) ufpth = ucn[0][0] if any(ucn) else None if ufpth is not None: ufpth2 = ufpth break if ufpth2 is None: ufpth2 = os.path.join(os.path.dirname(namefile2), "MT3D001.UCN") else: if isinstance(files2, (str, os.PathLike)): files2 = [files2] for file in files2: for ext in valid_ext: if ext in os.path.basename(file).lower(): ufpth2 = file break # confirm that there are two files to compare if ufpth1 is None or ufpth2 is None: if ufpth1 is None: print(" UCN file 1 not set") if ufpth2 is None: print(" UCN file 2 not set") return True if not os.path.isfile(ufpth1) or not os.path.isfile(ufpth2): if not os.path.isfile(ufpth1): print(f" {ufpth1} does not exist") if not os.path.isfile(ufpth2): print(f" {ufpth2} does not exist") return True # Open output file if outfile is not None: f = open(outfile, "w") # Get stage objects uobj1 = flopy.utils.UcnFile(ufpth1, precision=precision, verbose=verbose) uobj2 = flopy.utils.UcnFile(ufpth2, precision=precision, verbose=verbose) # get times times1 = uobj1.get_times() times2 = uobj2.get_times() nt1 = len(times1) nt2 = len(times2) nt = min(nt1, nt2) for t1, t2 in zip(times1, times2): if not np.allclose([t1], [t2]): msg = f"times in two ucn files are not equal ({t1},{t2})" raise ValueError(msg) if nt == nt1: kstpkper = uobj1.get_kstpkper() else: kstpkper = uobj2.get_kstpkper() line_separator = 15 * "-" header = ( f"{' ':>15s} {' ':>15s} {'MAXIMUM':>15s}\n" + f"{'STRESS PERIOD':>15s} {'TIME STEP':>15s} " + f"{'CONC DIFFERENCE':>15s}\n" + f"{line_separator:>15s} " + f"{line_separator:>15s} " + f"{line_separator:>15s}\n" ) if verbose: print(f"Comparing results for {len(times1)} times") icnt = 0 # Process cumulative and incremental for idx, time in enumerate(times1[0:nt]): try: u1 = uobj1.get_data(totim=time) u2 = uobj2.get_data(totim=time) if difftol: diffmax, indices = _difftol(u1, u2, ctol) else: diffmax, indices = _diffmax(u1, u2) if outfile is not None: if idx < 1: f.write(header) f.write( f"{kstpkper[idx][1] + 1:15d} " + f"{kstpkper[idx][0] + 1:15d} " + f"{diffmax:15.6g}\n" ) if diffmax >= ctol: icnt += 1 if outfile is not None: if difftol: ee = ( f"Maximum concentration difference ({diffmax})" + f" -- {ctol} tolerance exceeded at " + f"{indices[0].shape[0]} node location(s)" ) else: ee = ( "Maximum concentration difference " + f"({diffmax}) exceeded " + f"at {indices[0].shape[0]} node location(s)" ) e = textwrap.fill( ee + ":", width=70, initial_indent=" ", subsequent_indent=" ", ) f.write(f"{e}\n") if verbose: print(ee + f" at time {time}") e = "" for itupe in indices: for ind in itupe: e += f"{ind + 1} " # convert to one-based e = textwrap.fill( e, width=70, initial_indent=" ", subsequent_indent=" ", ) f.write(f"{e}\n") # Write header again, unless it is the last record if idx + 1 < len(times1): f.write(f"\n{header}") except: print(f" could not process time={time}") print(" terminating ucn processing...") break # Close output file if outfile is not None: f.close() # test for failure success = True if icnt > 0: success = False return success
[docs]def compare_stages( namefile1: Union[str, os.PathLike] = None, namefile2: Union[str, os.PathLike] = None, files1: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, files2: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, htol=0.001, outfile: Optional[Union[str, os.PathLike]] = None, difftol=False, verbose=False, ): """Compare SWR process stage results from two simulations. Parameters ---------- namefile1 : str or PathLike namefile path for base model namefile2 : str or PathLike namefile path for comparison model precision : str precision for binary head file ("auto", "single", or "double") default is "auto" htol : float maximum allowed stage difference (default is 0.001) outfile : str or PathLike, optional head comparison output file name. If outfile is None, no comparison output is saved. (default is None) files1 : str, PathLike, or list, optional base model output file. If files1 is not None, results will be extracted from files1 and namefile1 will not be used. (default is None) files2 : str, PathLike, or list, optional comparison model output file. If files2 is not None, results will be extracted from files2 and namefile2 will not be used. (default is None) difftol : bool boolean determining if the absolute value of the stage difference greater than htol should be evaluated (default is False) verbose : bool boolean indicating if verbose output should be written to the terminal (default is False) Returns ------- success : bool boolean indicating if the stage differences are less than htol. """ try: import flopy except: msg = "flopy not available - cannot use compare_stages" raise ValueError(msg) # list of valid extensions valid_ext = ["stg"] # Get info for first stage file sfpth1 = None if namefile1 is not None: for ext in valid_ext: stg = get_entries_from_namefile(namefile1, extension=ext) sfpth = stg[0][0] if any(stg) else None if sfpth is not None: sfpth1 = sfpth break elif files1 is not None: if isinstance(files1, (str, os.PathLike)): files1 = [files1] for file in files1: for ext in valid_ext: if ext in os.path.basename(file).lower(): sfpth1 = file break # Get info for second stage file sfpth2 = None if namefile2 is not None: for ext in valid_ext: stg = get_entries_from_namefile(namefile2, extension=ext) sfpth = stg[0][0] if any(stg) else None if sfpth is not None: sfpth2 = sfpth break elif files2 is not None: if isinstance(files2, (str, os.PathLike)): files2 = [files2] for file in files2: for ext in valid_ext: if ext in os.path.basename(file).lower(): sfpth2 = file break # confirm that there are two files to compare if sfpth1 is None or sfpth2 is None: print("spth1 or spth2 is None") print(f"spth1: {sfpth1}") print(f"spth2: {sfpth2}") return False if not os.path.isfile(sfpth1) or not os.path.isfile(sfpth2): print("spth1 or spth2 is not a file") print(f"spth1 isfile: {os.path.isfile(sfpth1)}") print(f"spth2 isfile: {os.path.isfile(sfpth2)}") return False # Open output file if outfile is not None: f = open(outfile, "w") # Get stage objects sobj1 = flopy.utils.SwrStage(sfpth1, verbose=verbose) sobj2 = flopy.utils.SwrStage(sfpth2, verbose=verbose) # get totim times1 = sobj1.get_times() # get kswr, kstp, and kper kk = sobj1.get_kswrkstpkper() line_separator = 15 * "-" header = ( f"{' ':>15s} {' ':>15s} {' ':>15s} {'MAXIMUM':>15s}\n" + f"{'STRESS PERIOD':>15s} " + f"{'TIME STEP':>15s} " + f"{'SWR TIME STEP':>15s} " + f"{'STAGE DIFFERENCE':>15s}\n" + f"{line_separator:>15s} " + f"{line_separator:>15s} " + f"{line_separator:>15s} " + f"{line_separator:>15s}\n" ) if verbose: print(f"Comparing results for {len(times1)} times") icnt = 0 # Process stage data for idx, (kon, time) in enumerate(zip(kk, times1)): s1 = sobj1.get_data(totim=time) s2 = sobj2.get_data(totim=time) if s1 is None or s2 is None: continue s1 = s1["stage"] s2 = s2["stage"] if difftol: diffmax, indices = _difftol(s1, s2, htol) else: diffmax, indices = _diffmax(s1, s2) if outfile is not None: if idx < 1: f.write(header) f.write( f"{kon[2] + 1:15d} " + f"{kon[1] + 1:15d} " + f"{kon[0] + 1:15d} " + f"{diffmax:15.6g}\n" ) if diffmax >= htol: icnt += 1 if outfile is not None: if difftol: ee = ( f"Maximum head difference ({diffmax}) -- " + f"{htol} tolerance exceeded at " + f"{indices[0].shape[0]} node location(s)" ) else: ee = ( "Maximum head difference " + f"({diffmax}) exceeded " + f"at {indices[0].shape[0]} node location(s):" ) e = textwrap.fill( ee + ":", width=70, initial_indent=" ", subsequent_indent=" ", ) f.write(f"{e}\n") if verbose: print(ee + f" at time {time}") e = "" for itupe in indices: for ind in itupe: e += f"{ind + 1} " # convert to one-based e = textwrap.fill( e, width=70, initial_indent=" ", subsequent_indent=" ", ) f.write(f"{e}\n") # Write header again, unless it is the last record if idx + 1 < len(times1): f.write(f"\n{header}") # Close output file if outfile is not None: f.close() # test for failure success = True if icnt > 0: success = False return success
[docs]def compare( namefile1: Union[str, os.PathLike] = None, namefile2: Union[str, os.PathLike] = None, precision="auto", max_cumpd=0.01, max_incpd=0.01, htol=0.001, outfile1: Optional[Union[str, os.PathLike]] = None, outfile2: Optional[Union[str, os.PathLike]] = None, files1: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, files2: Optional[ Union[str, os.PathLike, List[Union[str, os.PathLike]]] ] = None, ): """Compare the budget and head results for two MODFLOW-based model simulations. Parameters ---------- namefile1 : str or PathLike, optional namefile path for base model namefile2 : str or PathLike, optional namefile path for comparison model precision : str precision for binary head file ("auto", "single", or "double") default is "auto" max_cumpd : float maximum percent discrepancy allowed for cumulative budget terms (default is 0.01) max_incpd : float maximum percent discrepancy allowed for incremental budget terms (default is 0.01) htol : float maximum allowed head difference (default is 0.001) outfile1 : str or PathLike, optional budget comparison output file name. If outfile1 is None, no budget comparison output is saved. (default is None) outfile2 : str or PathLike, optional head comparison output file name. If outfile2 is None, no head comparison output is saved. (default is None) files1 : str, PathLike, or list, optional base model output file. If files1 is not None, results will be extracted from files1 and namefile1 will not be used. (default is None) files2 : str, PathLike, or list, optional comparison model output file. If files2 is not None, results will be extracted from files2 and namefile2 will not be used. (default is None) Returns ------- success : bool boolean indicating if the budget and head differences are less than max_cumpd, max_incpd, and htol. """ # Compare budgets from the list files in namefile1 and namefile2 success1 = compare_budget( namefile1, namefile2, max_cumpd=max_cumpd, max_incpd=max_incpd, outfile=outfile1, files1=files1, files2=files2, ) success2 = compare_heads( namefile1, namefile2, precision=precision, htol=htol, outfile=outfile2, files1=files1, files2=files2, ) success = False if success1 and success2: success = True return success
[docs]def eval_bud_diff(fpth: Union[str, os.PathLike], b0, b1, ia=None, dtol=1e-6): # To use this eval_bud_diff function on a gwf or gwt budget file, # the function may need ia, in order to exclude comparison of the residual # term, which is stored in the diagonal position of the flowja array. # The following code can be used to extract ia from the grb file. # get ia/ja from binary grid file # fname = '{}.dis.grb'.format(os.path.basename(sim.name)) # fpth = os.path.join(sim.simpath, fname) # grbobj = flopy.mf6.utils.MfGrdFile(fpth) # ia = grbobj._datadict['IA'] - 1 diffmax = 0.0 difftag = "None" difftime = None fail = False # build list of cbc data to retrieve avail = b0.get_unique_record_names() # initialize list for storing totals for each budget term terms cbc_keys = [] for t in avail: if isinstance(t, bytes): t = t.decode() t = t.strip() cbc_keys.append(t) # open a summary file and write header f = open(fpth, "w") line = f"{'Time':15s}" line += f" {'Datatype':15s}" line += f" {'File 1':15s}" line += f" {'File 2':15s}" line += f" {'Difference':15s}" f.write(line + "\n") f.write(len(line) * "-" + "\n") # get data from cbc file kk = b0.get_kstpkper() times = b0.get_times() for idx, (k, t) in enumerate(zip(kk, times)): v0sum = 0.0 v1sum = 0.0 for key in cbc_keys: v0 = b0.get_data(kstpkper=k, text=key)[0] v1 = b1.get_data(kstpkper=k, text=key)[0] if isinstance(v0, np.recarray): v0 = v0["q"].sum() v1 = v1["q"].sum() else: v0 = v0.flatten() v1 = v1.flatten() if key == "FLOW-JA-FACE": # Set residual (stored in diagonal of flowja) to zero if ia is None: raise Exception("ia is required for model flowja") idiagidx = ia[:-1] v0[idiagidx] = 0.0 v1[idiagidx] = 0.0 v0 = v0.sum() v1 = v1.sum() # sum all of the values if key != "AUXILIARY": v0sum += v0 v1sum += v1 diff = v0 - v1 if abs(diff) > abs(diffmax): diffmax = diff difftag = key difftime = t if abs(diff) > dtol: fail = True line = f"{t:15g}" line += f" {key:15s}" line += f" {v0:15g}" line += f" {v1:15g}" line += f" {diff:15g}" f.write(line + "\n") # evaluate the sums diff = v0sum - v1sum if abs(diff) > dtol: fail = True line = f"{t:15g}" line += f" {'TOTAL':15s}" line += f" {v0sum:15g}" line += f" {v1sum:15g}" line += f" {diff:15g}" f.write(line + "\n") msg = f"\nSummary of changes in {os.path.basename(fpth)}\n" msg += "-" * 72 + "\n" msg += f"Maximum cbc difference: {diffmax}\n" msg += f"Maximum cbc difference time: {difftime}\n" msg += f"Maximum cbc datatype: {difftag}\n" if fail: msg += f"Maximum cbc criteria exceeded: {dtol}" assert not fail, msg # close summary file and print the final message f.close() print(msg) msg = f"sum of first cbc file flows ({v0sum}) " + f"exceeds dtol ({dtol})" assert abs(v0sum) < dtol, msg msg = f"sum of second cbc file flows ({v1sum}) " + f"exceeds dtol ({dtol})" assert abs(v1sum) < dtol, msg