Source code for flopy.utils.flopy_io

"""
Module for input/output utilities
"""
import os
import sys
import numpy as np

try:
    import pandas as pd
except:
    pd = False


def _fmt_string(array, float_format="{}"):
    """
    makes a formatting string for a rec-array;
    given a desired float_format.

    Parameters
    ----------
    array : np.recarray
    float_format : str
        formatter for floating point variable

    Returns
    -------
    fmt_string : str
        formatting string for writing output
    """
    fmt_string = ""
    for field in array.dtype.descr:
        vtype = field[1][1].lower()
        if vtype == "i":
            fmt_string += "{:.0f} "
        elif vtype == "f":
            fmt_string += "{} ".format(float_format)
        elif vtype == "o":
            fmt_string += "{} "
        elif vtype == "s":
            raise Exception(
                "MfList error: 'str' type found in dtype."
                + " This gives unpredictable results when "
                + "recarray to file - change to 'object' type"
            )
        else:
            raise Exception(
                "MfList.fmt_string error: unknown vtype " + "in dtype:" + vtype
            )
    return fmt_string


[docs]def line_strip(line): """ Remove comments and replace commas from input text for a free formatted modflow input file Parameters ---------- line : str a line of text from a modflow input file Returns ------- str : line with comments removed and commas replaced """ for comment_flag in [";", "#", "!!"]: line = line.split(comment_flag)[0] line = line.strip() return line.replace(",", " ")
[docs]def multi_line_strip(fobj): """ Get next line that is not blank or is not a comment line from a free formatted modflow input file Parameters ---------- fobj : open file object a line of text from an input file Returns ------- str : line with comments removed and commas replaced """ while True: line = line_strip(fobj.readline()) if line: return line.lower()
[docs]def get_next_line(f): """ Get the next line from a file that is not a blank line Parameters ---------- f : filehandle filehandle to a open file Returns ------- line : string next non-empty line in a open file """ while True: line = f.readline().rstrip() if len(line) > 0: break return line
[docs]def line_parse(line): """ Convert a line of text into to a list of values. This handles the case where a free formatted MODFLOW input file may have commas in it. """ line = line_strip(line) return line.split()
[docs]def pop_item(line, dtype=str): if len(line) > 0: if dtype == str: return line.pop(0) elif dtype == float: return float(line.pop(0)) elif dtype == int: # handle strings like this: # '-10.' return int(float(line.pop(0))) return dtype(0)
[docs]def write_fixed_var(v, length=10, ipos=None, free=False, comment=None): """ Parameters ---------- v : list, int, float, bool, or numpy array list, int, float, bool, or numpy array containing the data to be written to a string. length : int length of each column for fixed column widths. (default is 10) ipos : list, int, or numpy array user-provided column widths. (default is None) free : bool boolean indicating if a free format string should be generated. length and ipos are not used if free is True. (default is False) comment : str comment string to add to the end of the string Returns ------- out : str fixed or free format string generated using user-provided data """ if isinstance(v, np.ndarray): v = v.tolist() elif isinstance(v, int) or isinstance(v, float) or isinstance(v, bool): v = [v] ncol = len(v) # construct ipos if it was not passed if ipos is None: ipos = [] for i in range(ncol): ipos.append(length) else: if isinstance(ipos, np.ndarray): ipos = ipos.flatten().tolist() elif isinstance(ipos, int): ipos = [ipos] if len(ipos) < ncol: err = ( "user provided ipos length ({})".format(len(ipos)) + "should be greater than or equal " + "to the length of v ({})".format(ncol) ) raise Exception(err) out = "" for n in range(ncol): if free: write_fmt = "{} " else: if isinstance(v[n], (float, np.float, np.float32, np.float64)): width = ipos[n] - 6 vmin, vmax = 10 ** -width, 10 ** width if abs(v[n]) < vmin or abs(v[n]) > vmax: ctype = "g" else: ctype = ".{}f".format(width) elif isinstance(v[n], (int, np.int, np.int32, np.int64)): ctype = "d" else: ctype = "" write_fmt = "{{:>{}{}}}".format(ipos[n], ctype) out += write_fmt.format(v[n]) if comment is not None: out += " # {}".format(comment) out += "\n" return out
[docs]def read_fixed_var(line, ncol=1, length=10, ipos=None, free=False): """ Parse a fixed format line using user provided data Parameters ---------- line : str text string to parse. ncol : int number of columns to parse from line. (default is 1) length : int length of each column for fixed column widths. (default is 10) ipos : list, int, or numpy array user-provided column widths. (default is None) free : bool boolean indicating if sting is free format. ncol, length, and ipos are not used if free is True. (default is False) Returns ------- out : list padded list containing data parsed from the passed text string """ if free: out = line.rstrip().split() else: # construct ipos if it was not passed if ipos is None: ipos = [] for i in range(ncol): ipos.append(length) else: if isinstance(ipos, np.ndarray): ipos = ipos.flatten().tolist() elif isinstance(ipos, int): ipos = [ipos] ncol = len(ipos) line = line.rstrip() out = [] istart = 0 for ivar in range(ncol): istop = istart + ipos[ivar] try: txt = line[istart:istop] if len(txt.strip()) > 0: out.append(txt) else: out.append(0) except: break istart = istop return out
[docs]def flux_to_wel(cbc_file, text, precision="single", model=None, verbose=False): """ Convert flux in a binary cell budget file to a wel instance Parameters ---------- cbc_file : (str) cell budget file name text : (str) text string of the desired flux type (e.g. "drains") precision : (optional str) precision of the cell budget file model : (optional) BaseModel instance. If passed, a new ModflowWel instance will be added to model verbose : bool flag passed to CellBudgetFile Returns ------- flopy.modflow.ModflowWel instance """ from . import CellBudgetFile as CBF from .util_list import MfList from ..modflow import Modflow, ModflowWel cbf = CBF(cbc_file, precision=precision, verbose=verbose) # create a empty numpy array of shape (time,layer,row,col) m4d = np.zeros((cbf.nper, cbf.nlay, cbf.nrow, cbf.ncol), dtype=np.float32) m4d[:] = np.NaN # process the records in the cell budget file iper = -1 for kstpkper in cbf.kstpkper: kstpkper = (kstpkper[0] - 1, kstpkper[1] - 1) kper = kstpkper[1] # if we haven't visited this kper yet if kper != iper: arr = cbf.get_data(kstpkper=kstpkper, text=text, full3D=True) if len(arr) > 0: arr = arr[0] print(arr.max(), arr.min(), arr.sum()) # masked where zero arr[np.where(arr == 0.0)] = np.NaN m4d[iper + 1] = arr iper += 1 # model wasn't passed, then create a generic model if model is None: model = Modflow("test") # if model doesn't have a wel package, then make a generic one... # need this for the from_m4d method if model.wel is None: ModflowWel(model) # get the stress_period_data dict {kper:np recarray} sp_data = MfList.from_4d(model, "WEL", {"flux": m4d}) wel = ModflowWel(model, stress_period_data=sp_data) return wel
[docs]def loadtxt( file, delimiter=" ", dtype=None, skiprows=0, use_pandas=True, **kwargs ): """ Use pandas if it is available to load a text file (significantly faster than n.loadtxt or genfromtxt see http://stackoverflow.com/questions/18259393/numpy-loading-csv-too-slow-compared-to-matlab) Parameters ---------- file : file or str File, filename, or generator to read. delimiter : str, optional The string used to separate values. By default, this is any whitespace. dtype : data-type, optional Data-type of the resulting array skiprows : int, optional Skip the first skiprows lines; default: 0. use_pandas : bool If true, the much faster pandas.read_csv method is used. kwargs : dict Keyword arguments passed to numpy.loadtxt or pandas.read_csv. Returns ------- ra : np.recarray Numpy record array of file contents. """ # test if pandas should be used, if available if use_pandas: if pd: if delimiter.isspace(): kwargs["delim_whitespace"] = True if isinstance(dtype, np.dtype) and "names" not in kwargs: kwargs["names"] = dtype.names # if use_pandas and pd then use pandas if use_pandas and pd: df = pd.read_csv(file, dtype=dtype, skiprows=skiprows, **kwargs) return df.to_records(index=False) # default use of numpy else: return np.loadtxt(file, dtype=dtype, skiprows=skiprows, **kwargs)
[docs]def get_url_text(url, error_msg=None): """ Get text from a url. """ from urllib.request import urlopen try: urlobj = urlopen(url) text = urlobj.read().decode() return text except: e = sys.exc_info() print(e) if error_msg is not None: print(error_msg) return
[docs]def ulstrd(f, nlist, ra, model, sfac_columns, ext_unit_dict): """ Read a list and allow for open/close, binary, external, sfac, etc. Parameters ---------- f : file handle file handle for where the list is being read from nlist : int size of the list (number of rows) to read ra : np.recarray A record array of the correct size that will be filled with the list model : model object The model object (of type :class:`flopy.modflow.mf.Modflow`) to which this package will be added. sfac_columns : list A list of strings containing the column names to scale by sfac ext_unit_dict : dictionary, optional If the list in the file is specified using EXTERNAL, then in this case ext_unit_dict is required, which can be constructed using the function :class:`flopy.utils.mfreadnam.parsenamefile`. Returns ------- """ # initialize variables line = f.readline() sfac = 1.0 binary = False ncol = len(ra.dtype.names) line_list = line.strip().split() close_the_file = False file_handle = f mode = "r" # check for external if line.strip().lower().startswith("external"): inunit = int(line_list[1]) errmsg = "Could not find a file for unit {}".format(inunit) if ext_unit_dict is not None: if inunit in ext_unit_dict: namdata = ext_unit_dict[inunit] file_handle = namdata.filehandle else: raise IOError(errmsg) else: raise IOError(errmsg) if namdata.filetype == "DATA(BINARY)": binary = True if not binary: line = file_handle.readline() # or check for open/close elif line.strip().lower().startswith("open/close"): raw = line.strip().split() fname = raw[1] if "/" in fname: raw = fname.split("/") elif "\\" in fname: raw = fname.split("\\") else: raw = [fname] fname = os.path.join(*raw) oc_filename = os.path.join(model.model_ws, fname) msg = ( "Package.load() error: open/close filename " + oc_filename + " not found" ) assert os.path.exists(oc_filename), msg if "(binary)" in line.lower(): binary = True mode = "rb" file_handle = open(oc_filename, mode) close_the_file = True if not binary: line = file_handle.readline() # check for scaling factor if not binary: line_list = line.strip().split() if line.strip().lower().startswith("sfac"): sfac = float(line_list[1]) line = file_handle.readline() # fast binary read fromfile if binary: dtype2 = [] for name in ra.dtype.names: dtype2.append((name, np.float32)) dtype2 = np.dtype(dtype2) d = np.fromfile(file_handle, dtype=dtype2, count=nlist) ra = np.array(d, dtype=ra.dtype) ra = ra.view(np.recarray) # else, read ascii else: for ii in range(nlist): # first line was already read if ii != 0: line = file_handle.readline() if model.free_format_input: # whitespace separated t = line.strip().split() if len(t) < ncol: t = t + (ncol - len(t)) * [0.0] else: t = t[:ncol] t = tuple(t) ra[ii] = t else: # fixed format t = read_fixed_var(line, ncol=ncol) t = tuple(t) ra[ii] = t # scale the data and check for column_name in sfac_columns: ra[column_name] *= sfac if "auxsfac" in ra.dtype.names: ra[column_name] *= ra["auxsfac"] if close_the_file: file_handle.close() return ra