Source code for flopy.utils.flopy_io

"""
Module for input/output utilities
"""

import os
import platform
import sys
from pathlib import Path
from shutil import which
from typing import Union

import numpy as np
import pandas as pd


def _fmt_string(array, float_format="{}"):
    """
    makes a formatting string for a rec-array;
    given a desired float_format.

    Parameters
    ----------
    array : np.recarray
    float_format : str
        formatter for floating point variable

    Returns
    -------
    fmt_string : str
        formatting string for writing output
    """
    fmt_string = ""
    for field in array.dtype.descr:
        vtype = field[1][1].lower()
        if vtype == "i":
            fmt_string += "{:.0f} "
        elif vtype == "f":
            fmt_string += f"{float_format} "
        elif vtype == "o":
            fmt_string += "{} "
        elif vtype == "s":
            raise Exception(
                "MfList error: 'str' type found in dtype. "
                "This gives unpredictable results when "
                "recarray to file - change to 'object' type"
            )
        else:
            raise Exception(
                f"MfList.fmt_string error: unknown vtype in dtype:{vtype}"
            )
    return fmt_string


[docs]def line_strip(line): """ Remove comments and replace commas from input text for a free formatted modflow input file Parameters ---------- line : str a line of text from a modflow input file Returns ------- str : line with comments removed and commas replaced """ for comment_flag in [";", "#", "!!"]: line = line.split(comment_flag)[0] line = line.strip() return line.replace(",", " ")
[docs]def multi_line_strip(fobj): """ Get next line that is not blank or is not a comment line from a free formatted modflow input file Parameters ---------- fobj : open file object a line of text from an input file Returns ------- str : line with comments removed and commas replaced """ while True: line = line_strip(fobj.readline()) if line: return line.lower()
[docs]def get_next_line(f): """ Get the next line from a file that is not a blank line Parameters ---------- f : filehandle filehandle to a open file Returns ------- line : string next non-empty line in a open file """ while True: line = f.readline().rstrip() if len(line) > 0: break return line
[docs]def line_parse(line): """ Convert a line of text into to a list of values. This handles the case where a free formatted MODFLOW input file may have commas in it. """ line = line_strip(line) return line.split()
[docs]def pop_item(line, dtype=str): if len(line) > 0: if dtype == str: return line.pop(0) elif dtype == float: return float(line.pop(0)) elif dtype == int: # handle strings like this: # '-10.' return int(float(line.pop(0))) return dtype(0)
[docs]def write_fixed_var(v, length=10, ipos=None, free=False, comment=None): """ Parameters ---------- v : list, int, float, bool, or numpy array list, int, float, bool, or numpy array containing the data to be written to a string. length : int length of each column for fixed column widths. (default is 10) ipos : list, int, or numpy array user-provided column widths. (default is None) free : bool boolean indicating if a free format string should be generated. length and ipos are not used if free is True. (default is False) comment : str comment string to add to the end of the string Returns ------- out : str fixed or free format string generated using user-provided data """ if isinstance(v, np.ndarray): v = v.tolist() elif isinstance(v, int) or isinstance(v, float) or isinstance(v, bool): v = [v] ncol = len(v) # construct ipos if it was not passed if ipos is None: ipos = [] for i in range(ncol): ipos.append(length) else: if isinstance(ipos, np.ndarray): ipos = ipos.flatten().tolist() elif isinstance(ipos, int): ipos = [ipos] if len(ipos) < ncol: raise Exception( "user provided ipos length ({}) should be greater than or " "equal to the length of v ({})".format(len(ipos), ncol) ) out = "" for n in range(ncol): if free: write_fmt = "{} " else: width = ipos[n] if isinstance(v[n], (float, np.float32, np.float64)): decimal = width - 6 vmin, vmax = 10**-decimal, 10**decimal if abs(v[n]) < vmin or abs(v[n]) > vmax: ctype = "g" # default precision is 6 if not specified else: ctype = f".{decimal}f" # evaluate if the fixed format value will exceed width if len(f"{{:>{width}{ctype}}}".format(v[n])) > width: ctype = f".{decimal}g" # preserve precision elif isinstance(v[n], (int, np.int32, np.int64)): ctype = "d" else: ctype = "" write_fmt = f"{{:>{width}{ctype}}}" out += write_fmt.format(v[n]) if comment is not None: out += f" # {comment}" out += "\n" return out
[docs]def read_fixed_var(line, ncol=1, length=10, ipos=None, free=False): """ Parse a fixed format line using user provided data Parameters ---------- line : str text string to parse. ncol : int number of columns to parse from line. (default is 1) length : int length of each column for fixed column widths. (default is 10) ipos : list, int, or numpy array user-provided column widths. (default is None) free : bool boolean indicating if string is free format. ncol, length, and ipos are not used if free is True. (default is False) Returns ------- out : list padded list containing data parsed from the passed text string """ if free: out = line_parse(line) else: # construct ipos if it was not passed if ipos is None: ipos = [] for i in range(ncol): ipos.append(length) else: if isinstance(ipos, np.ndarray): ipos = ipos.flatten().tolist() elif isinstance(ipos, int): ipos = [ipos] ncol = len(ipos) line = line.rstrip() out = [] istart = 0 for ivar in range(ncol): istop = istart + ipos[ivar] try: txt = line[istart:istop] if len(txt.strip()) > 0: out.append(txt) else: out.append(0) except: break istart = istop return out
[docs]def flux_to_wel(cbc_file, text, precision="single", model=None, verbose=False): """ Convert flux in a binary cell budget file to a wel instance Parameters ---------- cbc_file : (str) cell budget file name text : (str) text string of the desired flux type (e.g. "drains") precision : (optional str) precision of the cell budget file model : (optional) BaseModel instance. If passed, a new ModflowWel instance will be added to model verbose : bool flag passed to CellBudgetFile Returns ------- flopy.modflow.ModflowWel instance """ from ..modflow import Modflow, ModflowWel from . import CellBudgetFile as CBF from .util_list import MfList cbf = CBF(cbc_file, precision=precision, verbose=verbose) # create a empty numpy array of shape (time,layer,row,col) m4d = np.zeros((cbf.nper, cbf.nlay, cbf.nrow, cbf.ncol), dtype=np.float32) m4d[:] = np.nan # process the records in the cell budget file iper = -1 for kstpkper in cbf.kstpkper: kstpkper = (kstpkper[0] - 1, kstpkper[1] - 1) kper = kstpkper[1] # if we haven't visited this kper yet if kper != iper: arr = cbf.get_data(kstpkper=kstpkper, text=text, full3D=True) if len(arr) > 0: arr = arr[0] print(arr.max(), arr.min(), arr.sum()) # masked where zero arr[np.where(arr == 0.0)] = np.nan m4d[iper + 1] = arr iper += 1 # model wasn't passed, then create a generic model if model is None: model = Modflow("test") # if model doesn't have a wel package, then make a generic one... # need this for the from_m4d method if model.wel is None: ModflowWel(model) # get the stress_period_data dict {kper:np recarray} sp_data = MfList.from_4d(model, "WEL", {"flux": m4d}) wel = ModflowWel(model, stress_period_data=sp_data) return wel
[docs]def loadtxt( file, delimiter=" ", dtype=None, skiprows=0, use_pandas=True, **kwargs ): """ Use pandas to load a text file (significantly faster than n.loadtxt or genfromtxt see https://stackoverflow.com/q/18259393/) Parameters ---------- file : file or str File, filename, or generator to read. delimiter : str, optional The string used to separate values. By default, this is any whitespace. dtype : data-type, optional Data-type of the resulting array skiprows : int, optional Skip the first skiprows lines; default: 0. use_pandas : bool If true, the much faster pandas.read_csv method is used. kwargs : dict Keyword arguments passed to numpy.loadtxt or pandas.read_csv. Returns ------- ra : np.recarray Numpy record array of file contents. """ if use_pandas: if delimiter.isspace(): kwargs["sep"] = "\\s+" if isinstance(dtype, np.dtype) and "names" not in kwargs: kwargs["names"] = dtype.names if use_pandas: df = pd.read_csv(file, dtype=dtype, skiprows=skiprows, **kwargs) return df.to_records(index=False) # default use of numpy else: return np.loadtxt(file, dtype=dtype, skiprows=skiprows, **kwargs)
[docs]def get_url_text(url, error_msg=None): """ Get text from a url. """ from urllib.request import urlopen try: urlobj = urlopen(url) text = urlobj.read().decode() return text except: e = sys.exc_info() print(e) if error_msg is not None: print(error_msg) return
[docs]def ulstrd(f, nlist, ra, model, sfac_columns, ext_unit_dict): """ Read a list and allow for open/close, binary, external, sfac, etc. Parameters ---------- f : file handle file handle for where the list is being read from nlist : int size of the list (number of rows) to read ra : np.recarray A record array of the correct size that will be filled with the list model : model object The model object (of type :class:`flopy.modflow.mf.Modflow`) to which this package will be added. sfac_columns : list A list of strings containing the column names to scale by sfac ext_unit_dict : dictionary, optional If the list in the file is specified using EXTERNAL, then in this case ext_unit_dict is required, which can be constructed using the function :class:`flopy.utils.mfreadnam.parsenamefile`. Returns ------- """ # initialize variables line = f.readline() sfac = 1.0 binary = False ncol = len(ra.dtype.names) line_list = line_parse(line) close_the_file = False file_handle = f mode = "r" # check for external if line.strip().lower().startswith("external"): inunit = int(line_list[1]) errmsg = f"Could not find a file for unit {inunit}" if ext_unit_dict is not None: if inunit in ext_unit_dict: namdata = ext_unit_dict[inunit] file_handle = namdata.filehandle else: raise OSError(errmsg) else: raise OSError(errmsg) if namdata.filetype == "DATA(BINARY)": binary = True if not binary: line = file_handle.readline() # or check for open/close elif line.strip().lower().startswith("open/close"): raw = line.strip().split() fname = raw[1] if "/" in fname: raw = fname.split("/") elif "\\" in fname: raw = fname.split("\\") else: raw = [fname] fname = os.path.join(*raw) oc_filename = os.path.join(model.model_ws, fname) msg = f"Package.load() error: open/close filename {oc_filename} not found" assert os.path.exists(oc_filename), msg if "(binary)" in line.lower(): binary = True mode = "rb" file_handle = open(oc_filename, mode) close_the_file = True if not binary: line = file_handle.readline() # check for scaling factor if not binary: if line.strip().lower().startswith("sfac"): line_list = line_parse(line) sfac = float(line_list[1]) line = file_handle.readline() # fast binary read fromfile if binary: dtype2 = [] for name in ra.dtype.names: dtype2.append((name, np.float32)) dtype2 = np.dtype(dtype2) d = np.fromfile(file_handle, dtype=dtype2, count=nlist) ra = np.array(d, dtype=ra.dtype) ra = ra.view(np.recarray) # else, read ascii else: for ii in range(nlist): # first line was already read if ii != 0: line = file_handle.readline() if model.free_format_input: # whitespace separated t = line_parse(line) if len(t) < ncol: t = t + (ncol - len(t)) * [0.0] else: t = t[:ncol] t = tuple(t) ra[ii] = t else: # fixed format t = read_fixed_var(line, ncol=ncol) t = tuple(t) ra[ii] = t # scale the data and check for column_name in sfac_columns: ra[column_name] *= sfac if "auxsfac" in ra.dtype.names: ra[column_name] *= ra["auxsfac"] if close_the_file: file_handle.close() return ra
[docs]def get_ts_sp(line): """ Reader method to get time step and stress period numbers from list files and Modflow other output files Parameters ---------- line : str line containing information about the stress period and time step. The line must contain "STRESS PERIOD <x> TIME STEP <y>" Returns ------- tuple of stress period and time step numbers """ # Get rid of nasty things line = line.replace(",", "").replace("*", "") searchstring = "TIME STEP" idx = line.index(searchstring) + len(searchstring) ll = line_parse(line[idx:]) ts = int(ll[0]) searchstring = "STRESS PERIOD" idx = line.index(searchstring) + len(searchstring) ll = line_parse(line[idx:]) sp = int(ll[0]) return ts, sp
[docs]def relpath_safe( path: Union[str, os.PathLike], start: Union[str, os.PathLike] = os.curdir, scrub: bool = False, ) -> str: """ Return a relative version of the path starting at the given start path. This is impossible on Windows if the paths are on different drives, in which case the absolute path is returned. The builtin os.path.relpath raises a ValueError, this method is a workaround to avoid interrupting normal control flow (background at https://bugs.python.org/issue7195). This method also truncates/obfuscates absolute paths with usernames. Parameters ---------- path : str or PathLike the path to truncate relative to the start path start : str or PathLike, default "." the starting path, defaults to the current working directory scrub : bool, default False whether to remove the current login name from paths Returns ------- str : the relative path, unless the platform is Windows and the `path` is not on the same drive as `start`, in which case the absolute path, with elements before and including usernames removed and obfuscated """ if start == os.curdir: start = os.getcwd() if platform.system() == "Windows": pa = os.path.abspath(path) sa = os.path.abspath(start) pd = os.path.splitdrive(pa)[0].lower() sd = os.path.splitdrive(sa)[0].lower() p = os.path.abspath(path) if pd != sd else os.path.relpath(pa, sa) else: p = os.path.relpath(path, start) return scrub_login(p) if scrub else p
[docs]def scrub_login(s: str) -> str: """ Remove the current login name from the given string, replacing any occurrences with "***". Parameters ---------- s : str the input string Returns ------- the string with login name obfuscated """ try: login = os.getlogin() return s.replace(login, "***") except OSError: # OSError is possible in CI, e.g. 'No such device or address' return s