rhodiumgroup / rhg_compute_tools Goto Github PK
View Code? Open in Web Editor NEWTools for using compute.rhg.com and compute.impactlab.org
License: MIT License
Tools for using compute.rhg.com and compute.impactlab.org
License: MIT License
Provide functions for more semantic use of client.map and other dask workflows
import functools, itertools
def expand(func):
'''
Decorator to expand an (args, kwargs) tuple in function calls
Intended for use with the :py:func:`collapse` function
Parameters
----------
func : function
Function to have arguments expanded. Func can have any
number of positional and keyword arguments.
Returns
-------
wrapped : function
Wrapped version of ``func`` which accepts a single
``(args, kwargs)`` tuple.
Examples
--------
.. code-block:: python
In [1]: @expand
...: def my_func(a, b, exp=1):
...: return (a * b)**exp
...:
In [2]: my_func((2, 3))
6
In [3]: my_func((2, 3, 2))
36
In [4]: my_func(tuple([]), {'b': 4, 'c': 2, 'a': 1})
16
This function can be used in combination with the ``collapse`` helper function,
which allows more natural parameter calls
.. code-block:: python
In [5]: my_func(collapse(2, 3, exp=2))
36
These can then be paired to enable many parameterized function calls:
.. code-block:: python
In [6]: func_calls = [collapse(a, a+1, exp=a) for a in range(5)]
In [7]: map(my_func, func_calls)
[1, 2, 36, 1728, 160000]
'''
@functools.wraps(func)
def inner(ak):
return func(*ak[0], **ak[1])
return inner
def collapse(*args, **kwargs):
'''
Collapse positional and keyword arguments into an (args, kwargs) tuple
Intended for use with the :py:func:`expand` decorator
Parameters
----------
*args
Variable length argument list.
**kwargs
Arbitrary keyword arguments.
Returns
-------
args : tuple
Positional arguments tuple
kwargs : dict
Keyword argument dictionary
'''
return (args, kwargs)
def collapse_product(*args, **kwargs):
'''
Parameters
----------
*args
Variable length list of iterables
**kwargs
Keyword arguments, whose values must be iterables
Returns
-------
iterator
Generator with collapsed arguments
See Also
--------
Function :py:func:`collapse`
Examples
--------
.. code-block:: python
In [1]: @expand
...: def my_func(a, b, exp=1):
...: return (a * b)**exp
...:
In [3]: product_args = collapse_product(
...: [0, 1, 2],
...: [0.5, 2],
...: exp=[0, 1])
In [4]: list(product_args) # doctest: NORMALIZE_WHITESPACE
[
((0, 0.5), {'exp': 0}),
((0, 0.5), {'exp': 1}),
((0, 2), {'exp': 0}),
((0, 2), {'exp': 1}),
((1, 0.5), {'exp': 0}),
((1, 0.5), {'exp': 1}),
((1, 2), {'exp': 0}),
((1, 2), {'exp': 1}),
((2, 0.5), {'exp': 0}),
((2, 0.5), {'exp': 1}),
((2, 2), {'exp': 0}),
((2, 2), {'exp': 1})]
In [5]: list(map(my_func, product_args))
[1.0, 0.0, 1, 0, 1.0, 0.5, 1, 2, 1.0, 1.0, 1, 4]
'''
num_args = len(args)
kwarg_keys = list(kwargs.keys())
kwarg_vals = [kwargs[k] for k in kwarg_keys]
format_iterations = lambda x: (tuple(x[:num_args]), dict(zip(kwarg_keys, x[num_args:])))
return map(format_iterations, itertools.product(*args, *kwarg_vals))
# forward compatibility
try:
from tqdm.auto import tqdm
except NameError:
from tqdm import tqdm_notebook as tqdm
# progress bar that blocks until completion
def blocking_pbar(futures, **kwargs):
pbar = tqdm(dd.as_completed(futures), total=len(futures), **kwargs)
for _ in pbar:
errors = [f for f in futures if f.status == 'error']
if len(errors) > 0:
pbar.set_postfix({'errors': len(errors)})
I'm just going to plug in the changelog from https://github.com/RhodiumGroup/rhg_compute_tools/releases.
right now the docs are all boilerplate
currently, collapse_product only allows cominatorial products of lists of options. But we frequently have associated groups of options, e.g.:
rcp_related = (
[
{
'rcp': 'historical',
'model': 'obs'}]
+ [{'rcp': 'rcp85', 'model': m} for m in rcp85_models]
+ [{'rcp': 'rcp45', 'model': m} for m in rcp45_models])
There's not a good way to do this right now with collapse_product
would be great to do something along the lines of
rhg-compute-tools gcs rsync src dst
Which suggests we probably need a broader command line strategy for handling different submodules
Paraphrased from @delgadom in #84:
It would be cool to combine our *_from_delayed
rhg_compute_tools.xarray
functions with the new xarray combine functions so you could combine based on coords or auto-combine. or just drop the dataarray_ and dataset_ from delayed functions and just provide dataarrays_ and datasets_ functions and point the users to these concat functions.
Workflow would just be:
futures = [ ... ] # flat list of dataarray futures with arbitrary non-overlapping coordinate relationships
da = xr.combine_by_coords(rhgx.dataarrays_from_delayed(futures))
futures = [[...], [...], ...] # nested list of datarrays with hierarchical structures
da = xr.combine_nested(rhgx.dataarrays_from_delayed(futures))
futures = [ ... ] # ordered flat list of dataarray futures with overlapping coordinate relationships
da = functools.reduce(lambda x, y: x.combine_first(y), rhgx.dataarrays_from_delayed(futures))
that's basically it
Cluster creation helpers don't trigger a worker build on scale
From https://pypi.org/project/pytest-runner/:
pytest-runner depends on deprecated features of setuptools and relies on features that break security mechanisms in pip. For example ‘setup_requires’ and ‘tests_require’ bypass pip --require-hashes. See also pypa/setuptools#1684.
It is recommended that you:
- Remove ‘pytest-runner’ from your ‘setup_requires’, preferably removing the setup_requires option.
- Remove ‘pytest’ and any other testing requirements from ‘tests_require’, preferably removing the setup_requires option.
- Select a tool to bootstrap and then run tests such as tox
I don't think dropping this would radically change the package, or how tests are run. It's just a matter of carefully removing it and everything created for it.
Not sure exactly where these should go. But here are some useful kubectl templates
Calls python to search for directories on all pods matching a search criteria
kubectl -n NAMESPACE get pods | grep SEARCH_CRITERIA | awk '{ print $1 }' | xargs -I@ kubectl exec -n NAMESPACE @ -- /opt/conda/envs/worker/bin/python -c 'import sys; import os; matches=[f for f in os.listdir("PATH_TO_SEARCH_DIR") if f in [DIRNAME_OR_LIST_OF_DIRNAMES]]; print((str(sys.argv[1]) + ": " + str(matches) + "\n") if len(matches) > 0 else "", end="");' @
For example, this finds specific geoclaw run directories on test-hub pods matching the pattern "dask-delgadom*"
:
kubectl -n test-hub get pods | grep dask-delgadom | awk '{ print $1 }' | xargs -I@ kubectl exec -n test-hub @ -- /opt/conda/envs/worker/bin/python -c 'import sys; import os; matches=[f for f in os.listdir("/") if f in ["gfdl5_rcp45_2007_2025_1825", "miroc5_rcp45_2007_2025_1083", "ccsm4_rcp85_2007_2025_626", "ccsm4_rcp85_2007_2025_1035", "mpi5_rcp45_2055_2065_668"]]; print((str(sys.argv[1]) + ": " + str(matches) + "\n") if len(matches) > 0 else "", end="");' @
kubectl -n NAMESPACE get pods | grep Completed | awk "{print $1}" | xargs kubectl -n NAMESPACE delete pods
for example on the main compute.rhg deployment:
kubectl -n rhodium-jupyter get pods | grep Completed | awk "{print $1}" | xargs kubectl -n rhodium-jupyter delete pods
Looks like they're out of date. e.g. pre-alpha development and python 2 support.
These were addressed in #71 which we are closing now so that we can first merge a purely black-only PR, and then we will again file a PR from the autofmt
branch that specifically addresses these bugs and is easier to review
evaluate this: https://automating-gis-processes.github.io/site/notebooks/L3/nearest-neighbor-faster.html
then maybe build it into some of our workflows slash generally enable it for people
allow cred_name='rhg_data'
instead of cred_path='/opt/gcsfuse_tokens/rhg-data.json'
I'm not 100% positive I wrote the cp
command correctly to handle cases where you are copying directories to other directories. The rules for exactly how the copy is conducted, depending on whether or not you specify -r
, whether you are using single or double wildcard characters, and whether you are copying files or directories, are quite complicated. It's making my head spin and so I gave up trying to fix things for now. sync
is often a simpler and easier option b/c you know you are syncing a directory to a directory. But at some point, we should take a look at cp
and make sure it's working appropriately.
Have no mention of replicate_directory_structure_on_gcs()
in changelog (errr, HISTORY) and that stuff is 🔥 so users should know about it. This is my bad.
using standard linux filesystems tools with the mounted /gcs volume is very slow for many files. Write a wrapper for gsutil -m cp -r
to do this copying much faster
we get rapid increases and decreases of the number of workers. Also, this doesn't play nice with multiple threads
Right now we have cp_gcs
, sync_gcs
, and rm
. In addition, each one takes src
, dest
, and e.g. cp_flags
or sync_flags
. We should make this consistent and just have cp
, sync
, and rm
. Each of which take src
, dest
, and flags
.
add a deprecation warning and gradually phase out rhg_compute_tools.utils in favor of https://github.com/ClimateImpactLab/parameterize_jobs
Right now we have these punk docstrings indicating deprecation. Might be good to trigger literal warnings about these. Would also make it easier to keep track of pending deprecations.
See https://docs.python.org/3/library/warnings.html.
Note the difference in target audience for choosing to use a FutureWarning or DeprecationWarning.
I'm trying to set an environment variable on a remote cluster. It appears that the dictionary implementation assigning values to names is slightly off (values()
should be items()
). Also, the handling for the deprecated list-of-dicts implementation has a typo (isintance
should be isinstance
) (I am submitting a PR to resolve both issues).
client, cluster = rhg_compute_tools.kubernetes.get_standard_cluster(
env_items = {
'FOO':'/path/to/bar'
}
)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-2-18c512dee399> in <module>
1 client, cluster = rhg_compute_tools.kubernetes.get_standard_cluster(
2 env_items = {
----> 3 'FOO':'/path/to/bar'
4 }
5 )
~/rhg_compute_tools/rhg_compute_tools/kubernetes.py in get_standard_cluster(*args, **kwargs)
316 """
317
--> 318 return get_cluster(*args, scaling_factor=1, **kwargs)
319
320
~/rhg_compute_tools/rhg_compute_tools/kubernetes.py in get_cluster(name, extra_pip_packages, extra_conda_packages, memory_gb, nthreads, cpus, cred_name, cred_path, env_items, scaling_factor, dask_config_dict, deploy_mode, idle_timeout, template_path, extra_worker_labels, extra_pod_tolerations, keep_default_tolerations, **kwargs)
221 [
222 container["env"].append({"name": k, "value": v})
--> 223 for k, v in env_items.values()
224 ]
225 # allow deprecated passing of list of name/value pairs
~/rhg_compute_tools/rhg_compute_tools/kubernetes.py in <listcomp>(.0)
221 [
222 container["env"].append({"name": k, "value": v})
--> 223 for k, v in env_items.values()
224 ]
225 # allow deprecated passing of list of name/value pairs
ValueError: too many values to unpack (expected 2)
Tools for exporting directories of results & a readme into single excel files
import os, glob
import numpy as np
import pandas as pd
import click
import docutils.nodes
import docutils.parsers.rst
import docutils.utils
SHEET_FORMATS = {}
def parse_rst(text):
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = (
docutils
.frontend
.OptionParser(components=components)
.get_default_values())
document = docutils.utils.new_document('<rst-doc>', settings=settings)
parser.parse(text, document)
return document
def rst_walker(sheet_writer, section, level=0, row=0, col=0):
if not hasattr(section, 'children'):
sheet_writer.write(row, col, section.astext().strip('\n'))
row += 1
return row
for block in section.children:
if hasattr(block, 'tagname') and (block.tagname == 'title'):
row += 1
fmt = SHEET_FORMATS.get(level, None)
sheet_writer.write(row, col, block.astext().strip('\n'), fmt)
row += 1
elif (
hasattr(block, 'tagname')
and (
(block.tagname == 'paragraph')
or (block.tagname == '#text')
or (level > 3))):
fmt = SHEET_FORMATS.get('text', None)
sheet_writer.write(
row,
col,
block.astext().strip('\n').replace('\n', ' '),
fmt)
row += 1
elif hasattr(block, 'tagname') and (block.tagname == 'bullet_list'):
fmt = SHEET_FORMATS.get('text', None)
for li in block.children:
sheet_writer.write(
row,
col,
'• ' + li.astext().strip('\n').replace('\n', ' '),
fmt)
row += 1
elif hasattr(block, 'tagname') and (block.tagname == 'enumerated_list'):
fmt = SHEET_FORMATS.get('text', None)
for i, li in enumerate(block.children):
sheet_writer.write(
row,
col,
'{}. '.format(i + 1) + li.astext().strip('\n').replace('\n', ' '),
fmt)
row += 1
else:
row = rst_walker(sheet_writer, block, level + 1, row=row, col=col)
return row
def readme_to_excel(readme_path, excel_writer, sheet_name='README', start_row=2, start_col=1):
with open(readme_path, 'r') as f:
doc = parse_rst(f.read())
workbook = excel_writer.book
worksheet = excel_writer.book.add_worksheet(sheet_name)
worksheet.set_column(1, 2, 60.)
SHEET_FORMATS.update({
1: workbook.add_format({
'bold': True,
'text_wrap': False,
'valign': 'top',
'font_size': 18}),
2: workbook.add_format({
'bold': True,
'text_wrap': False,
'valign': 'top',
'font_size': 14}),
3: workbook.add_format({
'bold': True,
'text_wrap': False,
'valign': 'top',
'font_size': 12}),
'text': workbook.add_format({
'bold': False,
'text_wrap': True,
'valign': 'top',
'font_size': 11})})
rst_walker(worksheet, doc, row=start_row, col=start_col)
def variable_to_excel(varname, root_dir, agglev, by='by_', scen=False, geog_cols=1, file_var=None):
writer = pd.ExcelWriter(
os.path.join(root_dir, f'{by}{agglev}', f'{file_var}_{agglev}.xlsx'),
engine='xlsxwriter')
option1 = os.path.join(
root_dir,
f'{by}{agglev}',
f'{file_var}_README.txt')
option2 = os.path.join(root_dir, f'{by}{agglev}', 'readme.txt')
readme_to_excel(
option1 if os.path.isfile(option1) else option2,
writer)
hist_file = os.path.join(
root_dir,
f'{by}{agglev}',
f'{file_var}_{agglev}_historical_1970-1990.csv')
if os.path.isfile(hist_file):
hist_data = pd.read_csv(hist_file).rename_axis('_INDEX')
hist_data = hist_data.set_index(
list(np.array(list(hist_data.columns))[list(range(geog_cols))]),
append=True)
hist = pd.concat(
{'historical': pd.concat(
{'1970-1990': pd.concat(
{'observed': hist_data},
names=['likelihood'])},
names=['period'])},
names=['scenario'])
(
hist
.xs('1970-1990', level='period')
.unstack(['scenario', 'likelihood'])
.reset_index('_INDEX', drop=True)
.to_excel(writer, sheet_name='1970-1990'))
dfs = {}
periods = ['2010-2030', '2020-2040', '2040-2060', '2060-2080', '2080-2100']
if scen:
rcps = ['expected-emissions', 'high-emissions']
else:
rcps = ['rcp45', 'rcp85']
for rcp in rcps:
dfp = {}
for period in periods:
fp = os.path.join(
root_dir,
f'{by}{agglev}',
f'{file_var}_{agglev}_{rcp}_{period}.csv')
if os.path.isfile(fp):
df = pd.read_csv(fp)
df = df.set_index(pd.Index(
np.hstack([np.arange(len(df)//5) for _ in range(5)]),
name='_INDEX'))
df = df.set_index(
list(df.columns.values[list(range(geog_cols + 1))]),
append=True)
dfp[period] = df
else:
print('nooop: {}'.format(fp))
if len(dfp) > 0:
dfs[rcp] = pd.concat(dfp, names=['period'])
proj = pd.concat(dfs, names=['scenario'])
proj.index.set_names('likelihood', level='quantile', inplace=True)
for period in proj.index.get_level_values('period').unique():
(
proj
.xs(period, level='period')
.unstack(['scenario', 'likelihood'])
.reset_index('_INDEX', drop=True)
.to_excel(writer, sheet_name=period))
writer.save()
@click.command()
@click.argument('varname')
@click.argument('root_dir')
@click.argument('agglev')
@click.option('--by', default='by_', help='Optional prefix for agglev (default "by_{AGGLEV}")')
@click.option('--scen/--no-scen', default=False, is_flag=True, help=(
'Use scenario names (e.g. high-emissions). Default is '
'to use rcp names (e.g. rcp85). This should reflect the '
'input file names/contents... the output will match the '
'inputs.'))
@click.option('--geog_cols', default=1, help='number of index columns to read in geography (default 1)', type=int)
@click.option('--file-var', default=None, help='varname used in csv file names (default VARNAME)')
def to_excel(varname, root_dir, agglev, by='by_', scen=False, geog_cols=1, file_var=None):
'''
Converts a standard Rhodium Climate Risk Service output csv+readme
directory into a single excel file.
Accepts as arguments the variable name (used in filenames), root directory
of data (containing agglev directories), and the regional aggregation level
(e.g. county, cbsa)
Example usage:
python build_excel.py total-economic-impact-as-share-of-GDP . cbsa
'''
if file_var is None:
file_var = varname
variable_to_excel(
varname, root_dir, agglev, by=by, scen=scen, geog_cols=geog_cols, file_var=file_var)
if __name__ == "__main__":
to_excel()
Daniel used this function (with recursive=True) to remove a "directory" with many sub-directories and files within it. It removed it from GCSFUSE but was still visible on the cloud console.
Additionally, I wrote this function to use the python API but sounds like calling gsutil would be faster due to parallelization.
So we should just rewrite this with gsutil.
def add_fuse_directory_markers_to_cloud_storage(client, bucket_name, root_path="", pbar=True):
"""
Create gcsfuse directory markers from a bucket and root path
Parameters
-----------
client : google.cloud.storage.Client
See the [google.cloud.storage.Client](https://googleapis.dev/python/storage/latest/client.html) docs for help setting this up.
bucket_name : str
name of the bucket on gcs
root_path : str, optional
prefix of "directories" below which to create the directory markers
Examples
---------
The following will create directory markers for all directories within gs://my-bucket/path/to/root,
where directories are indicated by the presence of blobs with directory separators (`'/'`) in the
path. Empty directories will not be created, since these cannot exist on google cloud storage.
.. code-block:: python
>>> client = google.cloud.storage.Client.from_service_account_json('/path/to/cred.json')
>>> add_fuse_directory_markers_to_cloud_storage(client, 'my-bucket', 'path/to/root/')
"""
blobs = bucket.list_blobs(prefix=root_path)
pages = blobs.pages
if pbar:
progress_bar = tqdm(pages)
total_items = 0
directories = set()
for page in pages:
if pbar:
total_items += page.num_items
progress_bar.total = total_items
progress_bar.refresh()
for blob in page:
if pbar:
progress_bar.update()
dirname = os.path.dirname(blob.name).rstrip("\\/") + "/"
if dirname not in directories:
dir_blob = bucket.blob(dirname)
if not dir_blob.exists():
dir_blob.upload_from_string(b"")
directories.add(dirname)
if pbar:
progress_bar.close()
it's still too jenky for a PR, but it works for now...
Allow specifying pod labels & tolerances. Right now must be done with pod template, e.g.:
metadata:
labels:
key1: val1
key2: val2
spec:
tolerations:
- effect: "NoSchedule"
key: "k8s.dask.org_dedicated"
operator: "Equal"
value: "worker-highcpu"
- effect: "NoSchedule"
key: "k8s.dask.org/dedicated"
operator: "Equal"
value: "worker-highcpu"
Given a list of futures (partially completed) and a cluster, should be easy to:
These should be tested on RHG hub, and, if stable/useful, submitted as PRs to dask_kubernetes
.
class FutureNotFoundError(KeyError):
pass
class PodNotFoundError(KeyError):
pass
def get_worker_ip_from_future(future, cluster):
try:
return list(cluster.scheduler.who_has[gc_ftrs.key])[0]
except KeyError:
pass
for w in cluster.scheduler.workers.values():
if (future in w.processing) or (future.key in [t.key for t in w.has_what]):
return w.address
raise FutureNotFoundError('task {} not found in cluster tasks'.format(future))
def get_pod_from_ip(ip, cluster):
ip = ip.split('://')[-1].split(':')[0]
for p in cluster.pods():
if p.status.pod_ip == ip:
return p
raise PodNotFoundError('No pod found with IP address {}'.format(ip))
There have to be faster ways of doing this, but I don't see them in the dask_kubernetes docs. They're admittedly very hacky though.
The next steps of executing things on the pods could be done from the command line, but using the kubernetes api would obviously be more elegant. pods returned by get_pod_from_ip
have a bunch of useful info. it's possible we could get the appropriately configured/authenticated kubernetes client from the dask cluster or client and execute things on this. The kubernetes package has some examples of calling exec
using the kubernetes api.
you can't currently install this thing
Package and docs mention python 2 support here and there. The code def can't run in python 2 so this should be scrubbed and cleaned away.
See #46 for starters.
this thing is complicated. let's show people how to use it!
Trying to load a dataarray from futures blew up my local machines memory. It occurred when gathering da.coords
but did not seem to occur if we changed to gathering dict(da.coords)
.
In theory, you could do this with https://github.com/rancher/k3s. This would allow for some better testing of the get_cluster
related commands. For example, right now the _get_cluster_dask_gateway
unit test creates a dask gateway server with a local backend. The local backend doesn't permit the same config options as a kubernetes backend, so not all of our config options are testable. But if we made a local k8s cluster and then spun up the gateway server on that, they would be. I'd imagine there are some other existing or planned functions that could make use of this for unit testing.
Might be overkill... but could be useful
If you set up a session and cache credentials using the gcloud or gsutil tools, we should be able to pull from these by simply creating a client
We need to add package dependencies and clean up internal imports. See https://travis-ci.org/RhodiumGroup/rhg_compute_tools.
Spin up for a cluster and (optionally) wait for workers to appear, with a progress bar. Optionally use as a context manager to spin down cluster after job execution.
try:
from tqdm.auto import tqdm
except ImportError:
from tqdm import tqdm_notebook as tqdm
class setup_cluster(object):
'''
Scales up a dask cluster, with the option to block until the workers are available
If ``setup_cluster`` is used as a context manager, the workers will be spun down
upon exit. Note that this does not automatically wait for all tasks to be
completed, so care should be taken to ensure all jobs will block until the context
is exited. Because the workers will be spun down after job completion,
debugging tasks can be difficult. Therefore, it's recommended that this not be
used for prototyping, and that tasks have their own mechanism for error
handling and possibly reporting when using ``setup_cluster`` this way.
Parameters
----------
nworkers : int
Number of workers to create
cluster_creator : func, optional
Cluster creation function. Object returned must have ``scale`` and ``close``
methods. Default is :py:func:`rhg_compute_tools.kubernetes.get_cluster`.
cluster_kwargs : dict, optional
Keyword arguments passed to ``cluster_creator``. Default ``{}``
block : bool, optional
Whether to block until all workers have come online. Default ``True``.
pbar : bool, optional
Whether to create a tqdm progress bar displaying worker spinup. Ignored
if ``block`` is ``False``. Default ``True``.
pbar_kwargs : dict, optional
Keyword arguments passed to :py:func:`tqdm.auto.tqdm`
Examples
--------
Can be used as a helper to scale up workers:
.. code-block:: python
>>> s = setup_cluster(10)
>>> client, cluster = s.scale_and_wait_for_workers() # doctest: +SKIP
100%|██████████| 10/10 [00:12<00:00, 21.72s/it]
>>> futures = client.map(lambda x: x**2, range(20)) # doctest: +SKIP
Alternatively, can be used as a context manager:
.. code-block:: python
>>> with setup_cluster(10, pbar_kwargs={'desc': 'workers'}) as client, cluster:
... futures = client.map(lambda x: x**2, range(20))
... wait_for_futures(
... futures,
... pbar_kwargs={'desc': 'jobs'}) # doctest: +SKIP
...
workers: 100%|██████████| 10/10 [00:12<00:00, 1.20s/it]
jobs: 100%|██████████| 10/10 [00:01<00:00, 9.83it/s]
'''
def __init__(
self,
nworkers,
cluster_creator=None,
cluster_kwargs=None,
block=True,
pbar=True,
pbar_kwargs=None):
self.nworkers = nworkers
self.cluster_creator = (
cluster_creator if cluster_creator is not None else rhgk.get_cluster)
self.cluster_kwargs = cluster_kwargs if cluster_kwargs is not None else {}
self.block = block
self.pbar = pbar
self.pbar_kwargs = None
def scale_and_wait_for_workers(self):
self.client, self.cluster = self.cluster_creator(**self.cluster_args)
self.cluster.scale(self.nworkers)
if self.block and self.pbar:
pbar = tqdm.tqdm_notebook(total=self.nworkers, desc='workers')
while True:
nworkers = len(self.client.ncores().values())
pbar.n = nworkers
pbar.refresh()
if nworkers < self.nworkers:
time.sleep(0.2)
else:
pbar.n = nworkers
pbar.refresh()
break
return self.client, self.cluster
def __enter__(self):
return self.scale_and_wait_for_workers()
def __exit__(self, *args, **kwargs):
self.cluster.scale(0)
self.client.close()
self.cluster.close()
Wait for futures to complete, with a progress bar
def wait_for_futures(futures, pbar_kwargs=None):
'''
Blocking progress bar for dask futures
Provides a progress bar which will block the python interpreter until
all futures are completed
Parameters
----------
futures : list or dask object
list of dask futures objects, or a dask collection such as a
Dataframe or Array object with a dask attribute
kwargs:
Keyword arguments passed to tqdm.auto.tqdm constructor
'''
if pbar_kwargs is None:
pbar_kwargs = {}
if hasattr(futures, 'dask'):
futures = futures.dask.values()
pbar = tqdm(dd.as_completed(futures), total=len(futures), **pbar_kwargs)
errors = 0
for f in pbar:
if f.status == 'error':
errors += 1
pbar.set_postfix({'errors': errors})
Our CI pipeline just broke and it's b/c matplotlib just released v3.4.0. In this version there is no more _rebuild()
function within the matplotlib.font_manager
module. We call this function here:
It looks like the font loading lines above L129 are all commented out, so do we need this anymore? It wasn't immediately clear to me why matplotlib took it out or what the proper way to rebuild the font cache is now, but if we don't need it anymore we could remove that line. Until then, it's blocking as it breaks our CI
@delgadom do you know if we still need that _rebuild
call?
In order to keep things clean, we could add the following commands to our tests
isort --profile black -c .
black --check .
isort just makes imports slightly easier to follow (orders them by type of package and alphabetically, splits them into groups, and deletes duplicate imports). Not a big deal but why not make things a little cleaner.
for large client.map workflows with long-running tasks, would be nice to have a process we could spin off to shut down workers as tasks complete when there are no tasks waiting and there are workers sitting idle
Tests are failing on master: https://travis-ci.org/github/RhodiumGroup/rhg_compute_tools/jobs/672710341#L2716
This was originally discovered in @dpa9694's PR #65.
Looks like this is related: pytest-dev/pytest#3518
Pytest seems to be wrapping fixtures multiple times when they are declared in testing modules, and recent versions of pytest prevent fixture re-wrapping, and therefore all fixture declarations must be moved to separate (non-testing) modules, e.g. tests/fixtures.py
.
for instance -P
which allows for persisting the modified times as gsutil is copying files. This matters for geoclaw plotting tools, annoyingly. I'll submit a PR for this.
import xarray as xr
import dask.array
import dask.distributed as dd
def dataarrays_from_delayed(futures, client=None):
'''
Returns a list of xarray dataarrays from a list of futures of dataarrays
Parameters
----------
futures : list
list of :py:class:`dask.delayed.Future` objects holding
:py:class:`xarray.DataArray` objects.
client : object, optional
:py:class:`dask.distributed.Client` to use in gathering
metadata on futures. If not provided, client is inferred
from context.
Returns
-------
arrays : list
list of :py:class:`xarray.DataArray` objects with
:py:class:`dask.array.Array` backends.
'''
if client is None:
client = dd.get_client()
delayed_arrays = client.map(lambda x: x.data, futures)
dask_array_metadata = client.gather(
client.map(lambda x: (x.data.shape, x.data.dtype), futures))
dask_arrays = [
dask.array.from_delayed(delayed_arrays[i], *dask_array_metadata[i])
for i in range(len(futures))]
array_metadata = client.gather(
client.map(
lambda x: {'dims': x.dims, 'coords': x.coords, 'attrs': x.attrs},
futures))
data_arrays = [
xr.DataArray(dask_arrays[i], **array_metadata[i])
for i in range(len(futures))]
return data_arrays
def dataarray_from_delayed(futures, dim=None, client=None):
'''
Returns a DataArray from a list of futures of dataarrays concatenated along ``dim``
Parameters
----------
futures : list
list of :py:class:`dask.delayed.Future` objects holding
:py:class:`xarray.DataArray` objects.
dim : str, optional
dimension along which to concat :py:class:`xarray.DataArray`.
Inferred by default.
client : object, optional
:py:class:`dask.distributed.Client` to use in gathering
metadata on futures. If not provided, client is inferred
from context.
Returns
-------
array : object
:py:class:`xarray.DataArray` concatenated along ``dim`` with
a :py:class:`dask.array.Array` backend.
'''
data_arrays = dataarrays_from_delayed(futures, client=client)
da = xr.concat(data_arrays, dim=dim)
return da
def datasets_from_delayed(futures, client=None):
'''
Returns a list of xarray datasets from a list of futures of datasets
Parameters
----------
futures : list
list of :py:class:`dask.delayed.Future` objects holding
:py:class:`xarray.Dataset` objects.
client : object, optional
:py:class:`dask.distributed.Client` to use in gathering
metadata on futures. If not provided, client is inferred
from context.
Returns
-------
arrays : list
list of :py:class:`xarray.Dataset` objects with
:py:class:`dask.array.Array` backends for each variable.
'''
if client is None:
client = dd.get_client()
data_var_keys = client.gather(client.map(lambda x: list(x.data_vars.keys()), futures))
delayed_arrays = [
{k: client.submit(lambda x: x[k].data, futures[i]) for k in data_var_keys[i]}
for i in range(len(futures))]
dask_array_metadata = [
{k: client.submit(lambda x: (x[k].data.shape, x[k].data.dtype), futures[i]).result() for k in data_var_keys[i]}
for i in range(len(futures))]
dask_data_arrays = [
{k: dask.array.from_delayed(delayed_arrays[i][k], *dask_array_metadata[i][k]) for k in data_var_keys[i]}
for i in range(len(futures))]
array_metadata = [
{k: client.submit(
lambda x: {'dims': x[k].dims, 'coords': x[k].coords, 'attrs': x[k].attrs},
futures[i]).result()
for k in data_var_keys[i]}
for i in range(len(futures))]
data_arrays = [
{k: xr.DataArray(dask_data_arrays[i][k], **array_metadata[i][k]) for k in data_var_keys[i]}
for i in range(len(futures))]
datasets = [xr.Dataset(arr) for arr in data_arrays]
dataset_metadata = client.gather(
client.map(lambda x: x.attrs, futures))
for i in range(len(futures)):
datasets[i].attrs.update(dataset_metadata[i])
return datasets
def dataset_from_delayed(futures, dim=None, client=None):
'''
Returns an :py:class:`xarray.Dataset` from a list of futures of datasets concatenated along ``dim``
Parameters
----------
futures : list
list of :py:class:`dask.delayed.Future` objects holding
:py:class:`xarray.Dataset` objects.
dim : str, optional
dimension along which to concat :py:class:`xarray.Dataset`.
Inferred by default.
client : object, optional
:py:class:`dask.distributed.Client` to use in gathering
metadata on futures. If not provided, client is inferred
from context.
Returns
-------
array : object
:py:class:`xarray.Dataset` concatenated along ``dim`` with
:py:class:`dask.array.Array` backends for each variable.
'''
datasets = datasets_from_delayed(futures, client=client)
ds = xr.concat(datasets, dim=dim)
return ds
Imagine there's no globals
No longer hard to dooooooo
Every object is local
Whitelisting's up to youuuuuuuu
import dis
import toolz
import inspect
import functools
import types
import collections.abc
_functions_and_modules = (
types.FunctionType,
types.ModuleType,
types.MethodType,
types.BuiltinMethodType,
types.BuiltinFunctionType
)
@toolz.functoolz.curry
def block_globals(obj, allowed_types=None, include_defaults=True, whitelist=None):
"""
Decorator to prevent the use of undefined closures and globals in functions and classes
Parameters
----------
func : function
Function to decorate. All globals not matching one of the allowed
types will raise an AssertionError
allowed_types : type or tuple of types, optional
Types which are allowed as globals. By default, functions and
modules are allowed. The full set of allowed types is drawn from
the ``types`` module, and includes :py:class:`~types.FunctionType`,
:py:class:`~types.ModuleType`, :py:class:`~types.MethodType`,
:py:class:`~types.BuiltinMethodType`, and
:py:class:`~types.BuiltinFunctionType`.
include_defaults : bool, optional
If allowed_types is provided, setting ``include_defaults`` to True will
append the default list of functions, modules, and methods to the
user-passed list of allowed types. Default is True, in which case
only the user-passed elements will be allowed. Setting to False will
allow only the types passed in ``allowed_types``.
whitelist : list of str, optional
Optional list of variable names to whitelist. If a list is provided,
global variables will be compared to elements of this list based on
their string names. Default (None) is no whitelist.
Examples
--------
Wrap a function to block globals:
.. code-block:: python
>>> my_data = 10
>>> @block_globals
... def add_5(data):
... ''' can you spot the global? '''
... a_number = 5
... result = a_number + my_data
... return result # doctest: +ELLIPSIS
...
Traceback (most recent call last)
...
TypeError: Illegal <class 'int'> global found in add_5: my_data
Wrapping a class will prevent globals from being used in all methods:
.. code-block:: python
>>> @block_globals
... class MyClass:
...
... @staticmethod
... def add_5(data):
... ''' can you spot the global? '''
... a_number = 5
... result = a_number + my_data
... return result # doctest: +ELLIPSIS
...
Traceback (most recent call last)
...
TypeError: Illegal <class 'int'> global found in add_5: my_data
By default, functions and modules are allowed in the list of globals. You
can modify this list with the ``allowed_types`` argument:
.. code-block:: python
>>> result_formatter = 'my number is {}'
>>> @block_globals(allowed_types=str)
... def add_5(data):
... ''' only allowed globals here! '''
... a_number = 5
... result = a_number + data
... return result_formatter.format(result)
...
>>> add_5(3)
my number is 8
block_globals will also catch undefined references:
.. code-block:: python
>>> @block_globals
... def get_mean(df):
... return da.mean() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
TypeError: Undefined global in get_mean: da
"""
if allowed_types is None:
allowed_types = _functions_and_modules
if (allowed_types is not None) and include_defaults:
if not isinstance(allowed_types, collections.abc.Sequence):
allowed_types = [allowed_types]
allowed_types = tuple(list(allowed_types) + list(_functions_and_modules))
if whitelist is None:
whitelist = []
if isinstance(obj, type):
for attr in obj.__dict__:
if callable(getattr(obj, attr)):
setattr(obj, attr, block_globals(getattr(obj, attr)))
return obj
closurevars = inspect.getclosurevars(obj)
for instr in dis.get_instructions(obj):
if instr.opname == 'LOAD_GLOBAL':
if instr.argval in closurevars.globals:
if instr.argval in whitelist:
continue
g = closurevars.globals[instr.argval]
if not isinstance(g, allowed_types):
raise TypeError('Illegal {} global found in {}: {}'.format(type(g), obj.__name__, instr.argval))
else:
raise TypeError('Undefined global in {}: {}'.format(obj.__name__, instr.argval))
@functools.wraps(obj)
def inner(*args, **kwargs):
return obj(*args, **kwargs)
return inner
Some sort of easy caching utility would be great. Something like a decorator that can accept a filepattern and an overwrite argument on write.
Does something like this already exist? Also would be great to have this work with intake!
Lots to still work out here, but here's a stab:
import toolz
import functools
@toolz.curry
def cache_results(
func,
storage_dir=None,
storage_pattern=None,
create_directories=False,
reader=pd.read_csv,
writer=lambda x, fp, **kwargs: x.to_csv(fp, **kwargs),
ext='.csv',
read_kwargs=None,
write_kwargs=None):
read_kwargs = read_kwargs if read_kwargs is not None or {}
write_kwargs = write_kwargs if write_kwargs is not None or {}
@functools.wraps(func)
def inner(*args, **kwargs, cache_path=None, overwrite=False):
# TODO: covert args to kwargs using inspect to get arg_names
kwargs = dict(**dict(zip(arg_names, args)), **kwargs)
if cache_path is None:
if storage_pattern is not None:
cache_path = storage_pattern.format(**kwargs)
elif storage_dir is not None:
# TODO: create function some_hash_of_kwargs to handle different
# orderings, usage of defaults, etc. stably
cache_path = os.path.join(
storage_dir, some_hash_of_kwargs(kwargs) + ext)
else:
raise ArgumentError(
'must provide cache_path or define storage_dir '
'or storage_pattern at function decoration')
if not overwrite:
try:
if create_directories:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
return reader(cache_path, **read_kwargs)
except (IOError, OSError, ValueError):
pass
res = func(*args, **kwargs)
writer(res, cache_path, **write_kwargs)
return res
return inner
This could be extended with a number of format-specific decorators quite easily
cache_in_netcdf = cache_results(
reader=xr.open_dataset,
writer=lambda ds, fp, **kwargs: ds.to_netcdf(fp, **kwargs),
ext='.nc')
cache_in_zarr = cache_results(
reader=xr.open_zarr,
writer=lambda ds, fp, **kwargs: ds.to_zarr(fp, **kwargs),
ext='.nc')
cache_in_parquet = cache_results(
reader=pd.read_parquet,
writer=lambda df, fp, **kwargs: df.to_parquet(fp, **kwargs),
ext='.nc')
cache_in_netcdf = cache_results(
reader=xr.open_dataset,
writer=lambda ds, fp, **kwargs: ds.to_netcdf(fp, **kwargs),
ext='.nc')
def cache_in_pickle(*args, **kwargs):
import pickle
def reader(fp, **kw):
with open(fp, 'rb') as f:
return pickle.load(f)
def writer(data, fp, **kw):
with open(fp, 'wb') as f:
pickle.dump(data, f)
return cache_results(*args, reader=reader, writer=writer, **kwargs)
These could then be used in a variety of ways.
No arguments on decoration requires that a path be provided when called:
@cache_in_csv
def generate_random_df(lenth):
return pd.DataFrame({'random': np.random.random(length)})
df = generate_random_df(4, cache_path='my_length_4_df.csv')
Providing a storage pattern allows you to set up a complex directory structure
@cache_in_netcdf(
storage_pattern='/data/transformed/tasmax_squared/{rcp}/{model}/{year}.nc',
create_directories=True)
def square_tasmax(rcp, model, year):
tasmax_pattern = '/data/source/nasa-nex/tasmax/{rcp}/{model}/{year}.nc'
return xr.open_dataset(tasmax_pattern.format(rcp=rcp, model=model, year=year)) ** 2
results = []
for rcp in ['rcp45', 'rcp85']:
for model in ['ACCESS1-0', 'IPSL-ESM-CHEM']:
for year in [2020, 2050, 2090]:
# each of these results will be cached in a different file
results.append(((rcp, model, year), square_tasmax(rcp, model, year)))
We can also pass reader/writer kwargs for more complex IO:
@cache_in_parquet(
read_kwargs=dict(storage_options={'token': 'cloud'}),
write_kwargs=dict(storage_options={'token': 'cloud'}))
def my_long_pandas_operation():
time.sleep(4)
return pd.DataFrame(np.random.random((6, 2)), columns=['a', 'b'])
df = my_long_pandas_operation(cache_path='gs://my_project/my_big_file.parquet')
Once the argument hashing in the TODO referenced above is implemented, we could handle arbitrarily complex argument calls, which will be hashed to form a unique, stable file name, e.g.:
@cache_in_pickle(storage_dir='/data/cached_noaa_api_calls')
def call_noaa_api(*args, **kwargs):
return noaa_api.query(*args, **kwargs)
something that specifies our normal, bigmem, TC, slosh, etc workers would be great
lambda functions in xarray utils (e.g. dataarray_from_futures
) are difficult to read, understand, and debug. make them real functions, Mike!
fig
or ax
handleA declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.