#!/usr/bin/env python3
"""Download and install USGS MODFLOW and related programs.
This script originates from FloPy: https://github.com/modflowpy/flopy
This file can be downloaded and run independently outside FloPy.
It requires Python 3.6 or later, and has no dependencies.
See https://developer.github.com/v3/repos/releases/ for GitHub Releases API.
"""
import json
import os
import shutil
import sys
import tempfile
import urllib
import urllib.request
import warnings
import zipfile
from importlib.util import find_spec
from pathlib import Path
from platform import processor
__all__ = ["run_main"]
__license__ = "CC0"
from typing import Dict, List, Tuple
default_owner = "MODFLOW-ORG"
default_repo = "executables"
# key is the repo name, value is the renamed file prefix for the download
renamed_prefix = {
"modflow6": "modflow6",
"executables": "modflow_executables",
"modflow6-nightly-build": "modflow6_nightly",
}
available_repos = list(renamed_prefix.keys())
max_http_tries = 3
# Check if this is running from flopy
within_flopy = False
spec = find_spec("flopy")
if spec is not None:
within_flopy = (
Path(spec.origin).resolve().parent in Path(__file__).resolve().parents
)
del spec
# local flopy install location (selected with :flopy)
flopy_appdata_path = (
Path(os.path.expandvars(r"%LOCALAPPDATA%\flopy"))
if sys.platform.startswith("win")
else Path.home() / ".local" / "share" / "flopy"
)
def get_ostag() -> str:
"""Determine operating system tag from sys.platform."""
if sys.platform.startswith("linux"):
return "linux"
elif sys.platform.startswith("win"):
return "win" + ("64" if sys.maxsize > 2**32 else "32")
elif sys.platform.startswith("darwin"):
arch = processor()
return "mac" + (arch if arch == "arm" else "")
raise ValueError(f"platform {sys.platform!r} not supported")
def get_suffixes(ostag) -> Tuple[str, str]:
if ostag.startswith("win"):
return ".exe", ".dll"
elif ostag == "linux":
return "", ".so"
elif "mac" in ostag:
return "", ".dylib"
else:
raise KeyError(f"unrecognized ostag {ostag!r}")
def get_request(url, params={}):
"""Get urllib.request.Request, with parameters and headers.
This bears GITHUB_TOKEN if it is set as an environment variable.
"""
if isinstance(params, dict):
if len(params) > 0:
url += "?" + urllib.parse.urlencode(params)
else:
raise TypeError("data must be a dict")
headers = {}
github_token = os.environ.get("GITHUB_TOKEN")
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
return urllib.request.Request(url, headers=headers)
def get_releases(owner=None, repo=None, quiet=False, per_page=None) -> List[str]:
"""Get list of available releases."""
owner = default_owner if owner is None else owner
repo = default_repo if repo is None else repo
req_url = f"https://api.github.com/repos/{owner}/{repo}/releases"
params = {}
if per_page is not None:
if per_page < 1 or per_page > 100:
raise ValueError("per_page must be between 1 and 100")
params["per_page"] = per_page
request = get_request(req_url, params=params)
num_tries = 0
while True:
num_tries += 1
try:
with urllib.request.urlopen(request, timeout=10) as resp:
result = resp.read()
break
except urllib.error.HTTPError as err:
if err.code == 401 and os.environ.get("GITHUB_TOKEN"):
raise ValueError("GITHUB_TOKEN env is invalid") from err
elif err.code == 403 and "rate limit exceeded" in err.reason:
raise ValueError(
f"use GITHUB_TOKEN env to bypass rate limit ({err})"
) from err
elif err.code in {404, 503} and num_tries < max_http_tries:
# GitHub sometimes returns this error for valid URLs, so retry
print(f"URL request {num_tries} did not work ({err})")
continue
raise RuntimeError(f"cannot retrieve data from {req_url}") from err
releases = json.loads(result.decode())
if not quiet:
print(f"found {len(releases)} releases for {owner}/{repo}")
avail_releases = ["latest"]
avail_releases.extend(release["tag_name"] for release in releases)
return avail_releases
def get_release(owner=None, repo=None, tag="latest", quiet=False) -> dict:
"""Get info about a particular release."""
owner = default_owner if owner is None else owner
repo = default_repo if repo is None else repo
api_url = f"https://api.github.com/repos/{owner}/{repo}"
req_url = (
f"{api_url}/releases/latest"
if tag == "latest"
else f"{api_url}/releases/tags/{tag}"
)
request = get_request(req_url)
releases = None
num_tries = 0
while True:
num_tries += 1
try:
with urllib.request.urlopen(request, timeout=10) as resp:
result = resp.read()
remaining = resp.headers.get("x-ratelimit-remaining", None)
if remaining and int(remaining) <= 10:
warnings.warn(
f"Only {remaining} GitHub API requests remaining "
"before rate-limiting"
)
break
except urllib.error.HTTPError as err:
if err.code == 401 and os.environ.get("GITHUB_TOKEN"):
raise ValueError("GITHUB_TOKEN env is invalid") from err
elif err.code == 403 and "rate limit exceeded" in err.reason:
raise ValueError(
f"use GITHUB_TOKEN env to bypass rate limit ({err})"
) from err
elif err.code == 404:
if releases is None:
releases = get_releases(owner, repo, quiet)
if tag not in releases:
raise ValueError(
f"Release {tag} not found (choose from {', '.join(releases)})"
)
elif err.code == 503 and num_tries < max_http_tries:
# GitHub sometimes returns this error for valid URLs, so retry
warnings.warn(f"URL request {num_tries} did not work ({err})")
continue
raise RuntimeError(f"cannot retrieve data from {req_url}") from err
release = json.loads(result.decode())
tag_name = release["tag_name"]
if not quiet:
print(f"fetched release {tag_name!r} info from {owner}/{repo}")
return release
def columns_str(items, line_chars=79) -> str:
"""Return str of columns of items, similar to 'ls' command."""
item_chars = max(len(item) for item in items)
num_cols = line_chars // item_chars
if num_cols == 0:
num_cols = 1
num_rows = len(items) // num_cols
if len(items) % num_cols != 0:
num_rows += 1
lines = []
for row_num in range(num_rows):
row_items = items[row_num::num_rows]
lines.append(" ".join(item.ljust(item_chars) for item in row_items).rstrip())
return "\n".join(lines)
def get_bindir_options(previous=None) -> Dict[str, Tuple[Path, str]]:
"""Generate install location options based on platform and filesystem access."""
options = {} # key is an option name, value is (optpath, optinfo)
if previous is not None and os.access(previous, os.W_OK):
# Make previous bindir as the first option
options[":prev"] = (previous, "previously selected bindir")
if within_flopy: # don't check is_dir() or access yet
options[":flopy"] = (flopy_appdata_path / "bin", "used by FloPy")
# Python bin (same for standard or conda varieties)
py_bin = Path(sys.prefix) / ("Scripts" if get_ostag().startswith("win") else "bin")
if py_bin.is_dir() and os.access(py_bin, os.W_OK):
options[":python"] = (py_bin, "used by Python")
home_local_bin = Path.home() / ".local" / "bin"
if home_local_bin.is_dir() and os.access(home_local_bin, os.W_OK):
options[":home"] = (home_local_bin, "user-specific bindir")
local_bin = Path("/usr") / "local" / "bin"
if local_bin.is_dir() and os.access(local_bin, os.W_OK):
options[":system"] = (local_bin, "system local bindir")
# Windows user
windowsapps_dir = Path(os.path.expandvars(r"%LOCALAPPDATA%\Microsoft\WindowsApps"))
if windowsapps_dir.is_dir() and os.access(windowsapps_dir, os.W_OK):
options[":windowsapps"] = (windowsapps_dir, "User App path")
# any other possible OS-specific hard-coded locations?
if not options:
raise RuntimeError("could not find any installable folders")
return options
def select_bindir(bindir, previous=None, quiet=False, is_cli=False) -> Path:
"""Resolve an install location if provided, or prompt interactive user to
select one."""
options = get_bindir_options(previous)
if len(bindir) > 1: # auto-select mode
# match one option that starts with input, e.g. :Py -> :python
sel = [opt for opt in options if opt.startswith(bindir.lower())]
if len(sel) != 1:
opt_avail = ", ".join(
f"'{opt}' for '{optpath}'" for opt, (optpath, _) in options.items()
)
raise ValueError(f"invalid option '{bindir}', choose from: {opt_avail}")
if not quiet:
print(f"auto-selecting option {sel[0]!r} for 'bindir'")
return Path(options[sel[0]][0]).resolve()
else:
if not is_cli:
opt_avail = ", ".join(
f"'{opt}' for '{optpath}'" for opt, (optpath, _) in options.items()
)
raise ValueError(f"specify the option, choose from: {opt_avail}")
ioptions = dict(enumerate(options.keys(), 1))
print("select a number to extract executables to a directory:")
for iopt, opt in ioptions.items():
optpath, optinfo = options[opt]
print(f" {iopt}: '{optpath}' -- {optinfo} ('{opt}')")
num_tries = 0
while True:
num_tries += 1
res = input("> ")
try:
opt = ioptions[int(res)]
print(f"selecting option {opt!r}")
return Path(options[opt][0]).resolve()
except (KeyError, ValueError):
if num_tries < 2:
print("invalid option, try choosing option again")
else:
raise RuntimeError("invalid option, too many attempts") from None
[docs]def run_main(
bindir,
owner=default_owner,
repo=default_repo,
release_id="latest",
ostag=None,
subset=None,
downloads_dir=None,
force=False,
quiet=False,
_is_cli=False,
):
"""Run main method to get MODFLOW and related programs.
Parameters
----------
bindir : str or Path
Writable path to extract executables. Auto-select options start with a
colon character. See error message or other documentation for further
information on auto-select options.
owner : str, default "MODFLOW-ORG"
Name of GitHub repository owner (user or organization).
repo : str, default "executables"
Name of GitHub repository. Choose one of "executables" (default),
"modflow6", or "modflow6-nightly-build".
release_id : str, default "latest"
GitHub release ID.
ostag : str, optional
Operating system tag; default is to automatically choose.
subset : list, set or str, optional
Optional subset of executables to extract, specified as a list (e.g.)
``["mfnwt", "mp6"]`` or a comma-separated string "mfnwt,mp6".
downloads_dir : str or Path, optional
Manually specify directory to download archives. Default is to use
home Downloads, if available, otherwise a temporary directory.
force : bool, default False
If True, always download archive. Default False will use archive if
previously downloaded in ``downloads_dir``.
quiet : bool, default False
If True, show fewer messages.
_is_cli : bool, default False
Control behavior of method if this is run as a command-line interface
or as a Python function.
"""
meta_path = False
prev_bindir = None
flopy_bin = False
if within_flopy:
meta_list = []
# Store metadata and possibly 'bin' in a user-writable path
if not flopy_appdata_path.exists():
flopy_appdata_path.mkdir(parents=True, exist_ok=True)
flopy_bin = flopy_appdata_path / "bin"
meta_path = flopy_appdata_path / "get_modflow.json"
meta_path_exists = meta_path.exists()
if meta_path_exists:
del_meta_path = False
try:
meta_list = json.loads(meta_path.read_text())
except (OSError, json.JSONDecodeError) as err:
print(f"cannot read flopy metadata file '{meta_path}': {err}")
if isinstance(err, OSError):
meta_path = False
if isinstance(err, json.JSONDecodeError):
del_meta_path = True
try:
prev_bindir = Path(meta_list[-1]["bindir"])
except (KeyError, IndexError):
del_meta_path = True
if del_meta_path:
try:
meta_path.unlink()
meta_path_exists = False
print(f"removed corrupt flopy metadata file '{meta_path}'")
except OSError as err:
print(f"cannot remove flopy metadata file: {err!r}")
meta_path = False
if ostag is None:
ostag = get_ostag()
if ostag == "win64par":
warnings.warn(
"The parallel build is deprecated and will no longer "
"be published: 'win64ext' replaces 'win64par'."
)
exe_suffix, lib_suffix = get_suffixes(ostag)
# select bindir if path not provided
if isinstance(bindir, str):
if bindir.startswith(":"):
bindir = select_bindir(
bindir, previous=prev_bindir, quiet=quiet, is_cli=_is_cli
) # returns resolved Path
else:
bindir = Path(bindir).resolve()
elif isinstance(bindir, Path):
bindir = bindir.resolve()
else:
raise ValueError("Invalid bindir option (expected string or Path)")
# make sure bindir exists
if bindir == flopy_bin:
if not within_flopy:
raise ValueError("option ':flopy' is only for flopy")
elif not flopy_bin.exists():
# special case option that can create non-existing directory
flopy_bin.mkdir(parents=True, exist_ok=True)
if not bindir.is_dir():
raise OSError(f"extraction directory '{bindir}' does not exist")
elif not os.access(bindir, os.W_OK):
raise OSError(f"extraction directory '{bindir}' is not writable")
# make sure repo option is valid
if repo not in available_repos:
raise KeyError(f"repo {repo!r} not supported; choose one of {available_repos}")
# get the selected release
release = get_release(owner, repo, release_id, quiet)
assets = release.get("assets", [])
for asset in assets:
asset_name = asset["name"]
if ostag in asset_name:
break
else:
raise ValueError(
f"could not find ostag {ostag!r} from release {release['tag_name']!r}; "
f"see available assets here:\n{release['html_url']}"
)
download_url = asset["browser_download_url"]
if repo == "modflow6":
asset_pth = Path(asset_name)
asset_suffix = asset_pth.suffix
dst_fname = "-".join([repo, release["tag_name"], ostag]) + asset_suffix
else:
# change local download name so it is more unique
dst_fname = "-".join([renamed_prefix[repo], release["tag_name"], asset_name])
tmpdir = None
if downloads_dir is None:
downloads_dir = Path.home() / "Downloads"
if not (downloads_dir.is_dir() and os.access(downloads_dir, os.W_OK)):
tmpdir = tempfile.TemporaryDirectory()
downloads_dir = Path(tmpdir.name)
else: # check user-defined
downloads_dir = Path(downloads_dir)
if not downloads_dir.is_dir():
raise OSError(f"downloads directory '{downloads_dir}' does not exist")
elif not os.access(downloads_dir, os.W_OK):
raise OSError(f"downloads directory '{downloads_dir}' is not writable")
download_pth = downloads_dir / dst_fname
if download_pth.is_file() and not force:
if not quiet:
print(
f"using previous download '{download_pth}' (use "
f"{'--force' if _is_cli else 'force=True'!r} to re-download)"
)
else:
if not quiet:
print(f"downloading '{download_url}' to '{download_pth}'")
urllib.request.urlretrieve(download_url, download_pth)
if subset:
if isinstance(subset, str):
subset = set(subset.replace(",", " ").split())
else:
subset = set(subset)
# Open archive and extract files
extract = set()
chmod = set()
items = []
full_path = {}
if meta_path:
from datetime import datetime
meta = {
"bindir": str(bindir),
"owner": owner,
"repo": repo,
"release_id": release["tag_name"],
"name": asset_name,
"updated_at": asset["updated_at"],
"extracted_at": datetime.now().isoformat(),
}
if subset:
meta["subset"] = sorted(subset)
with zipfile.ZipFile(download_pth, "r") as zipf:
# First gather files within internal directories named "bin"
for pth in zipf.namelist():
p = Path(pth)
if p.parent.name == "bin":
full_path[p.name] = pth
files = set(full_path.keys())
if not files:
# there was no internal "bin", so assume all files to be extracted
files = set(zipf.namelist())
code = False
if "code.json" in files and repo == "executables":
code_bytes = zipf.read("code.json")
code = json.loads(code_bytes.decode())
if meta_path:
import hashlib
code_md5 = hashlib.md5(code_bytes).hexdigest()
meta["code_json_md5"] = code_md5
if "code.json" in files:
# don't extract this file
files.remove("code.json")
if subset:
nosub = False
subset_keys = files
if code:
subset_keys |= set(code.keys())
not_found = subset.difference(subset_keys)
if not_found:
raise ValueError(
f"subset item{'s' if len(not_found) != 1 else ''} "
f"not found: {', '.join(sorted(not_found))}\n"
f"available items are:\n{columns_str(sorted(subset_keys))}"
)
else:
nosub = True
subset = set()
if code:
def add_item(key, fname, do_chmod):
if fname in files:
extract.add(fname)
items.append(f"{fname} ({code[key]['version']})")
if do_chmod:
chmod.add(fname)
else:
print(f"file {fname} does not exist")
return
for key in sorted(code):
if code[key].get("shared_object"):
fname = f"{key}{lib_suffix}"
if nosub or (subset and (key in subset or fname in subset)):
add_item(key, fname, do_chmod=False)
else:
fname = f"{key}{exe_suffix}"
if nosub or (subset and (key in subset or fname in subset)):
add_item(key, fname, do_chmod=True)
# check if double version exists
fname = f"{key}dbl{exe_suffix}"
if (
code[key].get("double_switch", True)
and fname in files
and (nosub or (subset and (key in subset or fname in subset)))
):
add_item(key, fname, do_chmod=True)
else:
# releases without code.json
for fname in sorted(files):
if nosub or (subset and fname in subset):
if full_path:
extract.add(full_path[fname])
else:
extract.add(fname)
items.append(fname)
if not fname.endswith(lib_suffix):
chmod.add(fname)
if not quiet:
print(
f"extracting {len(extract)} "
f"file{'s' if len(extract) != 1 else ''} to '{bindir}'"
)
zipf.extractall(bindir, members=extract)
# If this is a TemporaryDirectory, then delete the directory and files
del tmpdir
if full_path:
# move files that used a full path to bindir
rmdirs = set()
for fpath in extract:
fpath = Path(fpath)
bindir_path = bindir / fpath
bindir_path.replace(bindir / fpath.name)
rmdirs.add(fpath.parent)
# clean up directories, starting with the longest
for rmdir in sorted(rmdirs, reverse=True):
bindir_path = bindir / rmdir
bindir_path.rmdir()
for subdir in rmdir.parents:
bindir_path = bindir / subdir
if bindir_path == bindir:
break
shutil.rmtree(str(bindir_path))
if "win" not in ostag:
# similar to "chmod +x fname" for each executable
for fname in chmod:
pth = bindir / fname
pth.chmod(pth.stat().st_mode | 0o111)
# Show listing
if not quiet:
if any(items):
print(columns_str(items))
if not subset:
if full_path:
extract = {Path(fpth).name for fpth in extract}
unexpected = extract.difference(files)
if unexpected:
print(f"unexpected remaining {len(unexpected)} files:")
print(columns_str(sorted(unexpected)))
# Save metadata, only for flopy
if meta_path:
if "pytest" in str(bindir) or "pytest" in sys.modules:
# Don't write metadata if this is part of pytest
print("skipping writing flopy metadata for pytest")
return
meta_list.append(meta)
if not flopy_appdata_path.exists():
flopy_appdata_path.mkdir(parents=True, exist_ok=True)
try:
meta_path.write_text(json.dumps(meta_list, indent=4) + "\n")
except OSError as err:
print(f"cannot write flopy metadata file: '{meta_path}': {err!r}")
if not quiet:
if meta_path_exists:
print(f"updated flopy metadata file: '{meta_path}'")
else:
print(f"wrote new flopy metadata file: '{meta_path}'")
def cli_main():
"""Command-line interface."""
import argparse
# Show meaningful examples at bottom of help
prog = Path(sys.argv[0]).stem
if sys.platform.startswith("win"):
drv = Path("c:/")
else:
drv = Path("/")
example_bindir = drv / "path" / "to" / "bin"
examples = f"""\
Examples:
Install executables into an existing '{example_bindir}' directory:
$ {prog} {example_bindir}
Install a development snapshot of MODFLOW 6 by choosing a repo:
$ {prog} --repo modflow6-nightly-build {example_bindir}
"""
if within_flopy:
examples += f"""\
FloPy users can install executables using a special option:
$ {prog} :flopy
"""
parser = argparse.ArgumentParser(
description=__doc__.split("\n")[0],
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=examples,
)
bindir_help = (
"Directory to extract executables. Use ':' to interactively select an "
"option of paths. Other auto-select options are only available if the "
"current user can write files. "
)
if within_flopy:
bindir_help += (
"Option ':prev' is the previously used 'bindir' path selection. "
"Option ':flopy' will create and install programs for FloPy. "
)
if sys.platform.startswith("win"):
bindir_help += (
"Option ':python' is Python's Scripts directory. "
"Option ':windowsapps' is "
"'%%LOCALAPPDATA%%\\Microsoft\\WindowsApps'."
)
else:
bindir_help += (
"Option ':python' is Python's bin directory. "
"Option ':home' is '$HOME/.local/bin'. "
"Option ':system' is '/usr/local/bin'."
)
parser.add_argument("bindir", help=bindir_help)
parser.add_argument(
"--owner",
type=str,
default=default_owner,
help=f"GitHub repository owner; default is '{default_owner}'.",
)
parser.add_argument(
"--repo",
choices=available_repos,
default=default_repo,
help=f"Name of GitHub repository; default is '{default_repo}'.",
)
parser.add_argument(
"--release-id",
default="latest",
help="GitHub release ID; default is 'latest'.",
)
parser.add_argument(
"--ostag",
help="Operating system tag; default is to automatically choose.",
)
parser.add_argument(
"--subset",
help="Subset of executables to extract, specified as a "
"comma-separated string, e.g. 'mfnwt,mp6'.",
)
parser.add_argument(
"--downloads-dir",
help="Manually specify directory to download archives.",
)
parser.add_argument(
"--force",
action="store_true",
help="Force re-download archive. Default behavior will use archive if "
"previously downloaded in downloads-dir.",
)
parser.add_argument("--quiet", action="store_true", help="Show fewer messages.")
args = vars(parser.parse_args())
try:
run_main(**args, _is_cli=True)
except (EOFError, KeyboardInterrupt):
sys.exit(f" cancelling '{sys.argv[0]}'")
if __name__ == "__main__":
"""Run command-line interface, if run as a script."""
cli_main()