Coder Social home page Coder Social logo

rhg_compute_tools's Introduction

RHG Compute Tools

PyPi package version Build status Documentation Status

Tools for using compute.rhg.com and compute.impactlab.org

Installation

pip:

pip install rhg_compute_tools

Features

Kubernetes tools

  • easily spin up a preconfigured cluster with get_cluster(), or flavors with get_micro_cluster(), get_standard_cluster(), get_big_cluster(), or get_giant_cluster().
>>> import rhg_compute_tools.kubernetes as rhgk
>>> cluster, client = rhgk.get_cluster()

Google cloud storage utilities

  • Utilities for managing google cloud storage directories in parallel from the command line or via a python API
>>> import rhg_compute_tools.gcs as gcs
>>> gcs.sync_gcs('my_data_dir', 'gs://my-bucket/my_data_dir')

rhg_compute_tools's People

Contributors

bolliger32 avatar brews avatar dallen5 avatar delgadom avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

rhg_compute_tools's Issues

kubectl tools

Not sure exactly where these should go. But here are some useful kubectl templates

Interacting directly with pods

Find a (or one of several) named directory(ies) on all pods:

Calls python to search for directories on all pods matching a search criteria

kubectl -n NAMESPACE get pods | grep SEARCH_CRITERIA | awk '{ print $1 }' | xargs -I@ kubectl exec -n NAMESPACE @ -- /opt/conda/envs/worker/bin/python -c 'import sys; import os; matches=[f for f in os.listdir("PATH_TO_SEARCH_DIR") if f in [DIRNAME_OR_LIST_OF_DIRNAMES]]; print((str(sys.argv[1]) + ": " +  str(matches) + "\n") if len(matches) > 0 else "", end="");' @

For example, this finds specific geoclaw run directories on test-hub pods matching the pattern "dask-delgadom*":

kubectl -n test-hub get pods | grep dask-delgadom | awk '{ print $1 }' | xargs -I@ kubectl exec -n test-hub @ -- /opt/conda/envs/worker/bin/python -c 'import sys; import os; matches=[f for f in os.listdir("/") if f in ["gfdl5_rcp45_2007_2025_1825", "miroc5_rcp45_2007_2025_1083", "ccsm4_rcp85_2007_2025_626", "ccsm4_rcp85_2007_2025_1035", "mpi5_rcp45_2055_2065_668"]]; print((str(sys.argv[1]) + ": " +  str(matches) + "\n") if len(matches) > 0 else "", end="");' @

Managing pods

Delete "Succeeded" pods

kubectl -n NAMESPACE get pods | grep Completed | awk "{print $1}" | xargs kubectl -n NAMESPACE delete pods

for example on the main compute.rhg deployment:

kubectl -n rhodium-jupyter get pods | grep Completed | awk "{print $1}" | xargs kubectl -n rhodium-jupyter delete pods

rhg_compute_tools.gcs.cp might not always do what we expect

I'm not 100% positive I wrote the cp command correctly to handle cases where you are copying directories to other directories. The rules for exactly how the copy is conducted, depending on whether or not you specify -r, whether you are using single or double wildcard characters, and whether you are copying files or directories, are quite complicated. It's making my head spin and so I gave up trying to fix things for now. sync is often a simpler and easier option b/c you know you are syncing a directory to a directory. But at some point, we should take a look at cp and make sure it's working appropriately.

add simple caching utility

Some sort of easy caching utility would be great. Something like a decorator that can accept a filepattern and an overwrite argument on write.

Does something like this already exist? Also would be great to have this work with intake!

Proposed implementation

Lots to still work out here, but here's a stab:

import toolz
import functools

@toolz.curry
def cache_results(
        func,
        storage_dir=None,
        storage_pattern=None,
        create_directories=False,
        reader=pd.read_csv,
        writer=lambda x, fp, **kwargs: x.to_csv(fp, **kwargs),
        ext='.csv',
        read_kwargs=None,
        write_kwargs=None):
    read_kwargs = read_kwargs if read_kwargs is not None or {}
    write_kwargs = write_kwargs if write_kwargs is not None or {}

    @functools.wraps(func)
    def inner(*args, **kwargs, cache_path=None, overwrite=False):
        # TODO: covert args to kwargs using inspect to get arg_names
        kwargs = dict(**dict(zip(arg_names, args)), **kwargs)

        if cache_path is None:
            if storage_pattern is not None:
                cache_path = storage_pattern.format(**kwargs)
            elif storage_dir is not None:
                # TODO: create function some_hash_of_kwargs to handle different
                # orderings, usage of defaults, etc. stably
                cache_path = os.path.join(
                    storage_dir, some_hash_of_kwargs(kwargs) + ext)

            else:
                raise ArgumentError(
                   'must provide cache_path or define storage_dir '
                   'or storage_pattern at function decoration')

        if not overwrite:
            try:
                if create_directories:
                    os.makedirs(os.path.dirname(cache_path), exist_ok=True)
                return reader(cache_path, **read_kwargs)
            except (IOError, OSError, ValueError):
                pass

        res = func(*args, **kwargs)
        writer(res, cache_path, **write_kwargs)
        return res
    return inner

This could be extended with a number of format-specific decorators quite easily

cache_in_netcdf = cache_results(
    reader=xr.open_dataset,
    writer=lambda ds, fp, **kwargs: ds.to_netcdf(fp, **kwargs),
    ext='.nc')

cache_in_zarr = cache_results(
    reader=xr.open_zarr,
    writer=lambda ds, fp, **kwargs: ds.to_zarr(fp, **kwargs),
    ext='.nc')

cache_in_parquet = cache_results(
    reader=pd.read_parquet,
    writer=lambda df, fp, **kwargs: df.to_parquet(fp, **kwargs),
    ext='.nc')

cache_in_netcdf = cache_results(
    reader=xr.open_dataset,
    writer=lambda ds, fp, **kwargs: ds.to_netcdf(fp, **kwargs),
    ext='.nc')

def cache_in_pickle(*args, **kwargs):
    import pickle

    def reader(fp, **kw):
         with open(fp, 'rb') as f:
            return pickle.load(f)

    def writer(data, fp, **kw):
        with open(fp, 'wb') as f:
            pickle.dump(data, f)

    return cache_results(*args, reader=reader, writer=writer, **kwargs)

Proposed usage

These could then be used in a variety of ways.

No arguments on decoration requires that a path be provided when called:

@cache_in_csv
def generate_random_df(lenth):
    return pd.DataFrame({'random': np.random.random(length)})

df = generate_random_df(4, cache_path='my_length_4_df.csv')

Providing a storage pattern allows you to set up a complex directory structure

@cache_in_netcdf(
        storage_pattern='/data/transformed/tasmax_squared/{rcp}/{model}/{year}.nc',
        create_directories=True)
def square_tasmax(rcp, model, year):
    tasmax_pattern = '/data/source/nasa-nex/tasmax/{rcp}/{model}/{year}.nc'
    return xr.open_dataset(tasmax_pattern.format(rcp=rcp, model=model, year=year)) ** 2

results = []
for rcp in ['rcp45', 'rcp85']:
    for model in ['ACCESS1-0', 'IPSL-ESM-CHEM']:
        for year in [2020, 2050, 2090]:
            # each of these results will be cached in a different file
            results.append(((rcp, model, year), square_tasmax(rcp, model, year)))

We can also pass reader/writer kwargs for more complex IO:

@cache_in_parquet(
        read_kwargs=dict(storage_options={'token': 'cloud'}),
        write_kwargs=dict(storage_options={'token': 'cloud'}))
def my_long_pandas_operation():
    time.sleep(4)
    return pd.DataFrame(np.random.random((6, 2)), columns=['a', 'b'])

df = my_long_pandas_operation(cache_path='gs://my_project/my_big_file.parquet')

Once the argument hashing in the TODO referenced above is implemented, we could handle arbitrarily complex argument calls, which will be hashed to form a unique, stable file name, e.g.:

@cache_in_pickle(storage_dir='/data/cached_noaa_api_calls')
def call_noaa_api(*args, **kwargs):
    return noaa_api.query(*args, **kwargs)

TODO

  • Look harder to see if something like this already exists!
  • Implement the hashing and arg/kwarg inspection features
  • Make sure the hashing implementation is stable and would not ever return wrong results (e.g. for changing defaults... ugh.
  • Maybe implement staleness redo criteria?

add globals blocker

Imagine there's no globals
No longer hard to dooooooo
Every object is local
Whitelisting's up to youuuuuuuu

import dis
import toolz
import inspect
import functools
import types
import collections.abc

_functions_and_modules = (
    types.FunctionType,
    types.ModuleType,
    types.MethodType,
    types.BuiltinMethodType,
    types.BuiltinFunctionType
)


@toolz.functoolz.curry
def block_globals(obj, allowed_types=None, include_defaults=True, whitelist=None):
    """
    Decorator to prevent the use of undefined closures and globals in functions and classes
    
    Parameters
    ----------
    func : function
        Function to decorate. All globals not matching one of the allowed
        types will raise an AssertionError
    allowed_types : type or tuple of types, optional
        Types which are allowed as globals. By default, functions and
        modules are allowed. The full set of allowed types is drawn from
        the ``types`` module, and includes :py:class:`~types.FunctionType`, 
        :py:class:`~types.ModuleType`, :py:class:`~types.MethodType`, 
        :py:class:`~types.BuiltinMethodType`, and
        :py:class:`~types.BuiltinFunctionType`.
    include_defaults : bool, optional
        If allowed_types is provided, setting ``include_defaults`` to True will
        append the default list of functions, modules, and methods to the
        user-passed list of allowed types. Default is True, in which case
        only the user-passed elements will be allowed. Setting to False will
        allow only the types passed in ``allowed_types``.
    whitelist : list of str, optional
        Optional list of variable names to whitelist. If a list is provided,
        global variables will be compared to elements of this list based on
        their string names. Default (None) is no whitelist.

    Examples
    --------
    
    Wrap a function to block globals:
    
    .. code-block:: python

        >>> my_data = 10

        >>> @block_globals
        ... def add_5(data):
        ...     ''' can you spot the global? '''
        ...     a_number = 5
        ...     result = a_number + my_data
        ...     return result  # doctest: +ELLIPSIS
        ...
        Traceback (most recent call last)
        ...
        TypeError: Illegal <class 'int'> global found in add_5: my_data
        
    Wrapping a class will prevent globals from being used in all methods:
    
    .. code-block:: python

        >>> @block_globals
        ... class MyClass:
        ...
        ...     @staticmethod
        ...     def add_5(data):
        ...         ''' can you spot the global? '''
        ...         a_number = 5
        ...         result = a_number + my_data
        ...         return result  # doctest: +ELLIPSIS
        ...
        Traceback (most recent call last)
        ...
        TypeError: Illegal <class 'int'> global found in add_5: my_data

    By default, functions and modules are allowed in the list of globals. You
    can modify this list with the ``allowed_types`` argument:
    
    .. code-block:: python

        >>> result_formatter = 'my number is {}'
        >>> @block_globals(allowed_types=str)
        ... def add_5(data):
        ...     ''' only allowed globals here! '''
        ...     a_number = 5
        ...     result = a_number + data
        ...     return result_formatter.format(result)
        ...
        >>> add_5(3)
        my number is 8

    block_globals will also catch undefined references:

    .. code-block:: python

        >>> @block_globals
        ... def get_mean(df):
        ...     return da.mean()  # doctest: +ELLIPSIS
        Traceback (most recent call last):
        ...
        TypeError: Undefined global in get_mean: da
    """
    
    if allowed_types is None:
        allowed_types = _functions_and_modules
    
    if (allowed_types is not None) and include_defaults:
        if not isinstance(allowed_types, collections.abc.Sequence):
            allowed_types = [allowed_types]
        
        allowed_types = tuple(list(allowed_types) + list(_functions_and_modules))
    
    if whitelist is None:
        whitelist = []

    if isinstance(obj, type):
        for attr in obj.__dict__:
            if callable(getattr(obj, attr)):
                setattr(obj, attr, block_globals(getattr(obj, attr)))
        return obj

    closurevars = inspect.getclosurevars(obj)
    for instr in dis.get_instructions(obj):
        if instr.opname == 'LOAD_GLOBAL':
            if instr.argval in closurevars.globals:
                if instr.argval in whitelist:
                    continue
                g = closurevars.globals[instr.argval]
                if not isinstance(g, allowed_types):
                    raise TypeError('Illegal {} global found in {}: {}'.format(type(g), obj.__name__, instr.argval))
            else:
                raise TypeError('Undefined global in {}: {}'.format(obj.__name__, instr.argval))

    @functools.wraps(obj)
    def inner(*args, **kwargs):
        return obj(*args, **kwargs)

    return inner

add cluster spinup & job management utilities

Cluster spinup

Spin up for a cluster and (optionally) wait for workers to appear, with a progress bar. Optionally use as a context manager to spin down cluster after job execution.

try:
    from tqdm.auto import tqdm
except ImportError:
    from tqdm import tqdm_notebook as tqdm


class setup_cluster(object):
    '''
    Scales up a dask cluster, with the option to block until the workers are available

    If ``setup_cluster`` is used as a context manager, the workers will be spun down
    upon exit. Note that this does not automatically wait for all tasks to be
    completed, so care should be taken to ensure all jobs will block until the context
    is exited. Because the workers will be spun down after job completion,
    debugging tasks can be difficult. Therefore, it's recommended that this not be
    used for prototyping, and that tasks have their own mechanism for error
    handling and possibly reporting when using ``setup_cluster`` this way.

    Parameters
    ----------
    nworkers : int
        Number of workers to create
    cluster_creator : func, optional
        Cluster creation function. Object returned must have ``scale`` and ``close``
        methods. Default is :py:func:`rhg_compute_tools.kubernetes.get_cluster`.
    cluster_kwargs : dict, optional
        Keyword arguments passed to ``cluster_creator``. Default ``{}``
    block : bool, optional
        Whether to block until all workers have come online. Default ``True``.
    pbar : bool, optional
        Whether to create a tqdm progress bar displaying worker spinup. Ignored
        if ``block`` is ``False``. Default ``True``.
    pbar_kwargs : dict, optional
        Keyword arguments passed to :py:func:`tqdm.auto.tqdm`

    Examples
    --------
    Can be used as a helper to scale up workers:

    .. code-block:: python

        >>> s = setup_cluster(10)
        >>> client, cluster = s.scale_and_wait_for_workers()  # doctest: +SKIP
        100%|██████████| 10/10 [00:12<00:00,  21.72s/it]
        >>> futures = client.map(lambda x: x**2, range(20))  # doctest: +SKIP

    Alternatively, can be used as a context manager:

    .. code-block:: python

        >>> with setup_cluster(10, pbar_kwargs={'desc': 'workers'}) as client, cluster:
        ...     futures = client.map(lambda x: x**2, range(20))
        ...     wait_for_futures(
        ...         futures,
        ...         pbar_kwargs={'desc': 'jobs'})  # doctest: +SKIP
        ...
        workers: 100%|██████████| 10/10 [00:12<00:00,  1.20s/it]
        jobs: 100%|██████████| 10/10 [00:01<00:00,  9.83it/s]
    '''
    def __init__(
            self,
            nworkers,
            cluster_creator=None,
            cluster_kwargs=None,
            block=True,
            pbar=True,
            pbar_kwargs=None):
        self.nworkers = nworkers
        self.cluster_creator = (
            cluster_creator if cluster_creator is not None else rhgk.get_cluster)
        self.cluster_kwargs = cluster_kwargs if cluster_kwargs is not None else {}
        self.block = block
        self.pbar = pbar
        self.pbar_kwargs = None
    
    def scale_and_wait_for_workers(self):
        self.client, self.cluster = self.cluster_creator(**self.cluster_args)
        self.cluster.scale(self.nworkers)
        
        if self.block and self.pbar:
            pbar = tqdm.tqdm_notebook(total=self.nworkers, desc='workers')

            while True:
                nworkers = len(self.client.ncores().values())
                pbar.n = nworkers
                pbar.refresh()
                if nworkers < self.nworkers:
                    time.sleep(0.2)
                else:
                    pbar.n = nworkers
                    pbar.refresh()
                    break

        return self.client, self.cluster
        
    def __enter__(self):
        return self.scale_and_wait_for_workers()

    def __exit__(self, *args, **kwargs):
        self.cluster.scale(0)
        self.client.close()
        self.cluster.close()

Task management

Wait for futures to complete, with a progress bar

def wait_for_futures(futures, pbar_kwargs=None):
    '''
    Blocking progress bar for dask futures

    Provides a progress bar which will block the python interpreter until
    all futures are completed

    Parameters
    ----------
    futures : list or dask object
        list of dask futures objects, or a dask collection such as a
        Dataframe or Array object with a dask attribute
    kwargs:
        Keyword arguments passed to tqdm.auto.tqdm constructor
    '''
    if pbar_kwargs is None:
        pbar_kwargs = {}

    if hasattr(futures, 'dask'):
        futures = futures.dask.values()

    pbar = tqdm(dd.as_completed(futures), total=len(futures), **pbar_kwargs)

    errors = 0
    for f in pbar:
        if f.status == 'error':
            errors += 1
            pbar.set_postfix({'errors': errors})

Other useful features

  • Task progress save/recovery (maybe a job for parameterize_jobs)
  • Job caching utilities (a decorator, probably?)

Couple of bugs in kubernetes.py

  • isintance instead of isinstance
  • had not imported warnings

These were addressed in #71 which we are closing now so that we can first merge a purely black-only PR, and then we will again file a PR from the autofmt branch that specifically addresses these bugs and is easier to review

spin down workers as tasks finish

Description

for large client.map workflows with long-running tasks, would be nice to have a process we could spin off to shut down workers as tasks complete when there are no tasks waiting and there are workers sitting idle

CI broken: pytest raises "fixture applied more than once"

Tests are failing on master: https://travis-ci.org/github/RhodiumGroup/rhg_compute_tools/jobs/672710341#L2716

This was originally discovered in @dpa9694's PR #65.

Looks like this is related: pytest-dev/pytest#3518

Pytest seems to be wrapping fixtures multiple times when they are declared in testing modules, and recent versions of pytest prevent fixture re-wrapping, and therefore all fixture declarations must be moved to separate (non-testing) modules, e.g. tests/fixtures.py.

Design tools

Color palettes & chart prep

Color schemes

  • RHG Colors
  • Impactlab colors

Fonts

  • Auto-loaded fonts
  • Tools for creating correct sizes, fonts, etc based on application, use case

Chart creation tools

  • Automatic binning by application type, color scheme
  • Automatic formatting with labels, etc. for different types of charts

Options for implementation

  • Seaborn configuration
  • Manual functions accepting a fig or ax handle

Inconsistency in API for different gsutil commpands

  • RHG Compute Tools version: current

Description

Right now we have cp_gcs, sync_gcs, and rm. In addition, each one takes src, dest, and e.g. cp_flags or sync_flags. We should make this consistent and just have cp, sync, and rm. Each of which take src, dest, and flags.

Should we instantiate a k8s cluster within CI for rhg_compute_tools.kubernetes unit tests?

In theory, you could do this with https://github.com/rancher/k3s. This would allow for some better testing of the get_cluster related commands. For example, right now the _get_cluster_dask_gateway unit test creates a dask gateway server with a local backend. The local backend doesn't permit the same config options as a kubernetes backend, so not all of our config options are testable. But if we made a local k8s cluster and then spun up the gateway server on that, they would be. I'd imagine there are some other existing or planned functions that could make use of this for unit testing.

Might be overkill... but could be useful

Drop refs to py2 support

Package and docs mention python 2 support here and there. The code def can't run in python 2 so this should be scrubbed and cleaned away.

See #46 for starters.

fix gcs.rm

Description

Daniel used this function (with recursive=True) to remove a "directory" with many sub-directories and files within it. It removed it from GCSFUSE but was still visible on the cloud console.

Additionally, I wrote this function to use the python API but sounds like calling gsutil would be faster due to parallelization.

So we should just rewrite this with gsutil.

add xarray_from_delayed

TODO

  • pull the client using actual dask tools, rather than having to paste these functions into a notebook with the client in scope
  • improve docs
  • write tests
  • maybe test it out and make a PR to xarray?

Prototype

import xarray as xr
import dask.array
import dask.distributed as dd


def dataarrays_from_delayed(futures, client=None):
    '''
    Returns a list of xarray dataarrays from a list of futures of dataarrays

    Parameters
    ----------
    futures : list
        list of :py:class:`dask.delayed.Future` objects holding
        :py:class:`xarray.DataArray` objects.
    client : object, optional
        :py:class:`dask.distributed.Client` to use in gathering
        metadata on futures. If not provided, client is inferred
        from context.

    Returns
    -------
    arrays : list
        list of :py:class:`xarray.DataArray` objects with
        :py:class:`dask.array.Array` backends.

    '''

    if client is None:
        client = dd.get_client()

    delayed_arrays = client.map(lambda x: x.data, futures)

    dask_array_metadata = client.gather(
        client.map(lambda x: (x.data.shape, x.data.dtype), futures))

    dask_arrays = [
        dask.array.from_delayed(delayed_arrays[i], *dask_array_metadata[i])
        for i in range(len(futures))]

    array_metadata = client.gather(
        client.map(
            lambda x: {'dims': x.dims, 'coords': x.coords, 'attrs': x.attrs},
            futures))

    data_arrays = [
        xr.DataArray(dask_arrays[i], **array_metadata[i])
        for i in range(len(futures))]

    return data_arrays


def dataarray_from_delayed(futures, dim=None, client=None):
    '''
    Returns a DataArray from a list of futures of dataarrays concatenated along ``dim``

    Parameters
    ----------
    futures : list
        list of :py:class:`dask.delayed.Future` objects holding
        :py:class:`xarray.DataArray` objects.
    dim : str, optional
        dimension along which to concat :py:class:`xarray.DataArray`.
        Inferred by default.
    client : object, optional
        :py:class:`dask.distributed.Client` to use in gathering
        metadata on futures. If not provided, client is inferred
        from context.

    Returns
    -------
    array : object
        :py:class:`xarray.DataArray` concatenated along ``dim`` with
        a :py:class:`dask.array.Array` backend.
    '''
    
    data_arrays = dataarrays_from_delayed(futures, client=client)
    da = xr.concat(data_arrays, dim=dim)
    
    return da


def datasets_from_delayed(futures, client=None):
    '''
    Returns a list of xarray datasets from a list of futures of datasets
    
    Parameters
    ----------
    futures : list
        list of :py:class:`dask.delayed.Future` objects holding
        :py:class:`xarray.Dataset` objects.
    client : object, optional
        :py:class:`dask.distributed.Client` to use in gathering
        metadata on futures. If not provided, client is inferred
        from context.
    
    Returns
    -------
    arrays : list
        list of :py:class:`xarray.Dataset` objects with
        :py:class:`dask.array.Array` backends for each variable.
    '''

    if client is None:
        client = dd.get_client()

    data_var_keys = client.gather(client.map(lambda x: list(x.data_vars.keys()), futures))
    
    delayed_arrays = [
        {k: client.submit(lambda x: x[k].data, futures[i]) for k in data_var_keys[i]}
        for i in range(len(futures))]

    dask_array_metadata = [
        {k: client.submit(lambda x: (x[k].data.shape, x[k].data.dtype), futures[i]).result() for k in data_var_keys[i]}
        for i in range(len(futures))]
    
    dask_data_arrays = [
        {k: dask.array.from_delayed(delayed_arrays[i][k], *dask_array_metadata[i][k]) for k in data_var_keys[i]}
        for i in range(len(futures))]
    
    array_metadata = [
            {k: client.submit(
                lambda x: {'dims': x[k].dims, 'coords': x[k].coords, 'attrs': x[k].attrs},
                futures[i]).result()
            for k in data_var_keys[i]}
        for i in range(len(futures))]

    data_arrays = [
        {k: xr.DataArray(dask_data_arrays[i][k], **array_metadata[i][k]) for k in data_var_keys[i]}
        for i in range(len(futures))]

    datasets = [xr.Dataset(arr) for arr in data_arrays]
    
    dataset_metadata = client.gather(
        client.map(lambda x: x.attrs, futures))
    
    for i in range(len(futures)):
        datasets[i].attrs.update(dataset_metadata[i])
    
    return datasets


def dataset_from_delayed(futures, dim=None, client=None):
    '''
    Returns an :py:class:`xarray.Dataset` from a list of futures of datasets concatenated along ``dim``

    Parameters
    ----------
    futures : list
        list of :py:class:`dask.delayed.Future` objects holding
        :py:class:`xarray.Dataset` objects.
    dim : str, optional
        dimension along which to concat :py:class:`xarray.Dataset`.
        Inferred by default.
    client : object, optional
        :py:class:`dask.distributed.Client` to use in gathering
        metadata on futures. If not provided, client is inferred
        from context.

    Returns
    -------
    array : object
        :py:class:`xarray.Dataset` concatenated along ``dim`` with
        :py:class:`dask.array.Array` backends for each variable.
    '''

    datasets = datasets_from_delayed(futures, client=client)
    ds = xr.concat(datasets, dim=dim)

    return ds

Add client.map helper functions

Goals

Provide functions for more semantic use of client.map and other dask workflows

  • expand/collapse for multi-arg function handling
  • provide jrnr-like parameter iteration tools

Prototype

import functools, itertools

def expand(func):
    '''
    Decorator to expand an (args, kwargs) tuple in function calls
    
    Intended for use with the :py:func:`collapse` function
    
    Parameters
    ----------
    func : function
        Function to have arguments expanded. Func can have any
        number of positional and keyword arguments.
    
    Returns
    -------
    wrapped : function
        Wrapped version of ``func`` which accepts a single 
        ``(args, kwargs)`` tuple.
    
    Examples
    --------
    
    .. code-block:: python

        In [1]: @expand
           ...: def my_func(a, b, exp=1):
           ...:     return (a * b)**exp
           ...:

        In [2]: my_func((2, 3))
        6
        
        In [3]: my_func((2, 3, 2))
        36
        
        In [4]: my_func(tuple([]), {'b': 4, 'c': 2, 'a': 1})
        16
        
    This function can be used in combination with the ``collapse`` helper function,
    which allows more natural parameter calls

    .. code-block:: python
        
        In [5]: my_func(collapse(2, 3, exp=2))
        36

    These can then be paired to enable many parameterized function calls:

    .. code-block:: python

        In [6]: func_calls = [collapse(a, a+1, exp=a) for a in range(5)]

        In [7]: map(my_func, func_calls)
        [1, 2, 36, 1728, 160000]

    '''

    @functools.wraps(func)
    def inner(ak):
        return func(*ak[0], **ak[1])
    return inner


def collapse(*args, **kwargs):
    '''
    Collapse positional and keyword arguments into an (args, kwargs) tuple
    
    Intended for use with the :py:func:`expand` decorator
    
    Parameters
    ----------
    *args
        Variable length argument list.
    **kwargs
        Arbitrary keyword arguments.
        
    Returns
    -------
    args : tuple
        Positional arguments tuple
    kwargs : dict
        Keyword argument dictionary
    '''
    return (args, kwargs)


def collapse_product(*args, **kwargs):
    '''
    
    Parameters
    ----------
    
    *args
        Variable length list of iterables
    **kwargs
        Keyword arguments, whose values must be iterables
        
    Returns
    -------
    iterator
        Generator with collapsed arguments
        
    See Also
    --------
    
    Function :py:func:`collapse`
    
    Examples
    --------
    
    .. code-block:: python

        In [1]: @expand
           ...: def my_func(a, b, exp=1):
           ...:     return (a * b)**exp
           ...:
    
        In [3]: product_args = collapse_product(
           ...:     [0, 1, 2],
           ...:     [0.5, 2],
           ...:     exp=[0, 1])
        
        In [4]: list(product_args)  # doctest: NORMALIZE_WHITESPACE
           [
               ((0, 0.5), {'exp': 0}),
               ((0, 0.5), {'exp': 1}),
               ((0, 2), {'exp': 0}),
               ((0, 2), {'exp': 1}),
               ((1, 0.5), {'exp': 0}),
               ((1, 0.5), {'exp': 1}),
               ((1, 2), {'exp': 0}),
               ((1, 2), {'exp': 1}),
               ((2, 0.5), {'exp': 0}),
               ((2, 0.5), {'exp': 1}),
               ((2, 2), {'exp': 0}),
               ((2, 2), {'exp': 1})]
               
        In [5]: list(map(my_func, product_args))
        [1.0, 0.0, 1, 0, 1.0, 0.5, 1, 2, 1.0, 1.0, 1, 4]
    '''
    num_args = len(args)
    kwarg_keys = list(kwargs.keys())
    kwarg_vals = [kwargs[k] for k in kwarg_keys]
    
    format_iterations = lambda x: (tuple(x[:num_args]), dict(zip(kwarg_keys, x[num_args:])))
    
    return map(format_iterations, itertools.product(*args, *kwarg_vals))
    

variables to excel

Tools for exporting directories of results & a readme into single excel files

import os, glob

import numpy as np
import pandas as pd
import click

import docutils.nodes
import docutils.parsers.rst
import docutils.utils

SHEET_FORMATS = {}


def parse_rst(text):
    parser = docutils.parsers.rst.Parser()
    components = (docutils.parsers.rst.Parser,)

    settings = (
        docutils
        .frontend
        .OptionParser(components=components)
        .get_default_values())

    document = docutils.utils.new_document('<rst-doc>', settings=settings)
    parser.parse(text, document)

    return document


def rst_walker(sheet_writer, section, level=0, row=0, col=0):
    if not hasattr(section, 'children'):
        sheet_writer.write(row, col, section.astext().strip('\n'))
        row += 1
        return row

    for block in section.children:
        if hasattr(block, 'tagname') and (block.tagname == 'title'):
            row += 1
            fmt = SHEET_FORMATS.get(level, None)
            sheet_writer.write(row, col, block.astext().strip('\n'), fmt)
            row += 1

        elif (
                hasattr(block, 'tagname')
                and (
                    (block.tagname == 'paragraph')
                    or (block.tagname == '#text')
                    or (level > 3))):

            fmt = SHEET_FORMATS.get('text', None)
            sheet_writer.write(
                row,
                col,
                block.astext().strip('\n').replace('\n', ' '),
                fmt)

            row += 1

        elif hasattr(block, 'tagname') and (block.tagname == 'bullet_list'):
            fmt = SHEET_FORMATS.get('text', None)
            for li in block.children:
                sheet_writer.write(
                    row,
                    col,
                    '• ' + li.astext().strip('\n').replace('\n', ' '),
                    fmt)
                row += 1

        elif hasattr(block, 'tagname') and (block.tagname == 'enumerated_list'):
            fmt = SHEET_FORMATS.get('text', None)
            for i, li in enumerate(block.children):
                sheet_writer.write(
                    row,
                    col,
                    '{}. '.format(i + 1) + li.astext().strip('\n').replace('\n', ' '),
                    fmt)
                row += 1

        else:
            row = rst_walker(sheet_writer, block, level + 1, row=row, col=col)

    return row


def readme_to_excel(readme_path, excel_writer, sheet_name='README', start_row=2, start_col=1):

    with open(readme_path, 'r') as f:
        doc = parse_rst(f.read())

    workbook  = excel_writer.book
    worksheet = excel_writer.book.add_worksheet(sheet_name)
    worksheet.set_column(1, 2, 60.)

    SHEET_FORMATS.update({
        1: workbook.add_format({
            'bold': True,
            'text_wrap': False,
            'valign': 'top',
            'font_size': 18}),
        2: workbook.add_format({
            'bold': True,
            'text_wrap': False,
            'valign': 'top',
            'font_size': 14}),
        3: workbook.add_format({
            'bold': True,
            'text_wrap': False,
            'valign': 'top',
            'font_size': 12}),
        'text': workbook.add_format({
            'bold': False,
            'text_wrap': True,
            'valign': 'top',
            'font_size': 11})})

    rst_walker(worksheet, doc, row=start_row, col=start_col)


def variable_to_excel(varname, root_dir, agglev, by='by_', scen=False, geog_cols=1, file_var=None):

    writer = pd.ExcelWriter(
        os.path.join(root_dir, f'{by}{agglev}', f'{file_var}_{agglev}.xlsx'),
        engine='xlsxwriter')

    option1 = os.path.join(
        root_dir,
        f'{by}{agglev}',
        f'{file_var}_README.txt')

    option2 = os.path.join(root_dir, f'{by}{agglev}', 'readme.txt')

    readme_to_excel(
        option1 if os.path.isfile(option1) else option2,
        writer)

    hist_file = os.path.join(
        root_dir,
        f'{by}{agglev}',
        f'{file_var}_{agglev}_historical_1970-1990.csv')

    if os.path.isfile(hist_file):


        hist_data = pd.read_csv(hist_file).rename_axis('_INDEX')
        hist_data = hist_data.set_index(
            list(np.array(list(hist_data.columns))[list(range(geog_cols))]),
            append=True)

        hist = pd.concat(
            {'historical': pd.concat(
                {'1970-1990': pd.concat(
                    {'observed': hist_data},
                     names=['likelihood'])},
                names=['period'])},
            names=['scenario'])

        (
            hist
            .xs('1970-1990', level='period')
            .unstack(['scenario', 'likelihood'])
            .reset_index('_INDEX', drop=True)
            .to_excel(writer, sheet_name='1970-1990'))

    dfs = {}
    periods = ['2010-2030', '2020-2040', '2040-2060', '2060-2080', '2080-2100']

    if scen:
        rcps = ['expected-emissions', 'high-emissions']
    else:
        rcps = ['rcp45', 'rcp85']

    for rcp in rcps:
        dfp = {}
        for period in periods:
            fp = os.path.join(
                root_dir,
                f'{by}{agglev}',
                f'{file_var}_{agglev}_{rcp}_{period}.csv')

            if os.path.isfile(fp):
                df = pd.read_csv(fp)
                df = df.set_index(pd.Index(
                    np.hstack([np.arange(len(df)//5) for _ in range(5)]),
                    name='_INDEX'))

                df = df.set_index(
                    list(df.columns.values[list(range(geog_cols + 1))]),
                    append=True)

                dfp[period] = df
            else:
                print('nooop: {}'.format(fp))

        if len(dfp) > 0:
            dfs[rcp] = pd.concat(dfp, names=['period'])

    proj = pd.concat(dfs, names=['scenario'])

    proj.index.set_names('likelihood', level='quantile', inplace=True)

    for period in proj.index.get_level_values('period').unique():
        (
            proj
            .xs(period, level='period')
            .unstack(['scenario', 'likelihood'])
            .reset_index('_INDEX', drop=True)
            .to_excel(writer, sheet_name=period))

    writer.save()


@click.command()
@click.argument('varname')
@click.argument('root_dir')
@click.argument('agglev')
@click.option('--by', default='by_', help='Optional prefix for agglev (default "by_{AGGLEV}")')
@click.option('--scen/--no-scen', default=False, is_flag=True, help=(
    'Use scenario names (e.g. high-emissions). Default is '
     'to use rcp names (e.g. rcp85). This should reflect the '
     'input file names/contents... the output will match the '
     'inputs.'))
@click.option('--geog_cols', default=1, help='number of index columns to read in geography (default 1)', type=int)
@click.option('--file-var', default=None, help='varname used in csv file names (default VARNAME)')
def to_excel(varname, root_dir, agglev, by='by_', scen=False, geog_cols=1, file_var=None):
    '''
    Converts a standard Rhodium Climate Risk Service output csv+readme
    directory into a single excel file.

    Accepts as arguments the variable name (used in filenames), root directory
    of data (containing agglev directories), and the regional aggregation level
    (e.g. county, cbsa)

    Example usage:

        python build_excel.py total-economic-impact-as-share-of-GDP . cbsa

    '''
    if file_var is None:
        file_var = varname

    variable_to_excel(
        varname, root_dir, agglev, by=by, scen=scen, geog_cols=geog_cols, file_var=file_var)


if __name__ == "__main__":
    to_excel()

Add a "make gcs fuse directories in place" function

def add_fuse_directory_markers_to_cloud_storage(client, bucket_name, root_path="", pbar=True):
    """
    Create gcsfuse directory markers from a bucket and root path

    Parameters
    -----------
    client : google.cloud.storage.Client
        See the [google.cloud.storage.Client](https://googleapis.dev/python/storage/latest/client.html) docs for help setting this up.
    bucket_name : str
        name of the bucket on gcs
    root_path : str, optional
        prefix of "directories" below which to create the directory markers

    Examples
    ---------

    The following will create directory markers for all directories within gs://my-bucket/path/to/root,
    where directories are indicated by the presence of blobs with directory separators (`'/'`) in the
    path. Empty directories will not be created, since these cannot exist on google cloud storage.

    .. code-block:: python

        >>> client = google.cloud.storage.Client.from_service_account_json('/path/to/cred.json')
        >>> add_fuse_directory_markers_to_cloud_storage(client, 'my-bucket', 'path/to/root/')

    """
    blobs = bucket.list_blobs(prefix=root_path)
    pages = blobs.pages
    if pbar:
        progress_bar = tqdm(pages)
        total_items = 0

    directories = set()

    for page in pages:
        if pbar:
            total_items += page.num_items
            progress_bar.total = total_items
            progress_bar.refresh()

        for blob in page:
            if pbar:
                progress_bar.update()

            dirname = os.path.dirname(blob.name).rstrip("\\/") + "/"

            if dirname not in directories:
                dir_blob = bucket.blob(dirname)
                if not dir_blob.exists():
                    dir_blob.upload_from_string(b"")

            directories.add(dirname)

    if pbar:
        progress_bar.close()

Setting environment variables on remote cluster does not work

  • RHG Compute Tools version: 0.2.1
  • Python version: 3.7.3
  • Operating System: Linux-4.14.138+-x86_64-with-debian-buster-sid

Description

I'm trying to set an environment variable on a remote cluster. It appears that the dictionary implementation assigning values to names is slightly off (values() should be items()). Also, the handling for the deprecated list-of-dicts implementation has a typo (isintance should be isinstance) (I am submitting a PR to resolve both issues).

What I Did

client, cluster = rhg_compute_tools.kubernetes.get_standard_cluster(
    env_items = {
        'FOO':'/path/to/bar'
    }
)

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-2-18c512dee399> in <module>
      1 client, cluster = rhg_compute_tools.kubernetes.get_standard_cluster(
      2     env_items = {
----> 3         'FOO':'/path/to/bar'
      4     }
      5 )

~/rhg_compute_tools/rhg_compute_tools/kubernetes.py in get_standard_cluster(*args, **kwargs)
    316     """
    317 
--> 318     return get_cluster(*args, scaling_factor=1, **kwargs)
    319 
    320 

~/rhg_compute_tools/rhg_compute_tools/kubernetes.py in get_cluster(name, extra_pip_packages, extra_conda_packages, memory_gb, nthreads, cpus, cred_name, cred_path, env_items, scaling_factor, dask_config_dict, deploy_mode, idle_timeout, template_path, extra_worker_labels, extra_pod_tolerations, keep_default_tolerations, **kwargs)
    221             [
    222                 container["env"].append({"name": k, "value": v})
--> 223                 for k, v in env_items.values()
    224             ]
    225         # allow deprecated passing of list of name/value pairs

~/rhg_compute_tools/rhg_compute_tools/kubernetes.py in <listcomp>(.0)
    221             [
    222                 container["env"].append({"name": k, "value": v})
--> 223                 for k, v in env_items.values()
    224             ]
    225         # allow deprecated passing of list of name/value pairs

ValueError: too many values to unpack (expected 2)

add ability to inject arbitrary labels and tolerances to pod template

Allow specifying pod labels & tolerances. Right now must be done with pod template, e.g.:

metadata:
  labels:
    key1: val1
    key2: val2
spec:
  tolerations:
  - effect: "NoSchedule"
    key: "k8s.dask.org_dedicated"
    operator: "Equal"
    value: "worker-highcpu"
  - effect: "NoSchedule"
    key: "k8s.dask.org/dedicated"
    operator: "Equal"
    value: "worker-highcpu"

mpl 3.4 drops `font_manager._rebuild()` function and breaks `design` imports

Description

Our CI pipeline just broke and it's b/c matplotlib just released v3.4.0. In this version there is no more _rebuild() function within the matplotlib.font_manager module. We call this function here:

from matplotlib.font_manager import _rebuild

It looks like the font loading lines above L129 are all commented out, so do we need this anymore? It wasn't immediately clear to me why matplotlib took it out or what the proper way to rebuild the font cache is now, but if we don't need it anymore we could remove that line. Until then, it's blocking as it breaks our CI

@delgadom do you know if we still need that _rebuild call?

Improve cluster management tools

Goals

Given a list of futures (partially completed) and a cluster, should be easy to:

  • Get the state, run duration, and any exceptions
  • Get the worker holding/processing the task
  • Get the pod the worker is running on
  • Execute arbitrary kubernetes commands on the pod and return the result

These should be tested on RHG hub, and, if stable/useful, submitted as PRs to dask_kubernetes.

What we have so far

class FutureNotFoundError(KeyError):
    pass

class PodNotFoundError(KeyError):
    pass

def get_worker_ip_from_future(future, cluster):
    try:
        return list(cluster.scheduler.who_has[gc_ftrs.key])[0]
    except KeyError:
        pass

    for w in cluster.scheduler.workers.values():
        if (future in w.processing) or (future.key in [t.key for t in w.has_what]):
            return w.address
    
    raise FutureNotFoundError('task {} not found in cluster tasks'.format(future))

def get_pod_from_ip(ip, cluster):
    ip = ip.split('://')[-1].split(':')[0]
    
    for p in cluster.pods():
        if p.status.pod_ip == ip:
            return p

    raise PodNotFoundError('No pod found with IP address {}'.format(ip))

There have to be faster ways of doing this, but I don't see them in the dask_kubernetes docs. They're admittedly very hacky though.

The next steps of executing things on the pods could be done from the command line, but using the kubernetes api would obviously be more elegant. pods returned by get_pod_from_ip have a bunch of useful info. it's possible we could get the appropriately configured/authenticated kubernetes client from the dask cluster or client and execute things on this. The kubernetes package has some examples of calling exec using the kubernetes api.

add a blocking_pbar

# forward compatibility
try:
    from tqdm.auto import tqdm
except NameError:
    from tqdm import tqdm_notebook as tqdm

# progress bar that blocks until completion
def blocking_pbar(futures, **kwargs):
    pbar = tqdm(dd.as_completed(futures), total=len(futures), **kwargs)
    for _ in pbar:
        errors = [f for f in futures if f.status == 'error']
        if len(errors) > 0:
            pbar.set_postfix({'errors': len(errors)})

Allow grouped options in collapse_product

currently, collapse_product only allows cominatorial products of lists of options. But we frequently have associated groups of options, e.g.:

rcp_related = (
    [
        {
            'rcp': 'historical',
            'model': 'obs'}]
    + [{'rcp': 'rcp85', 'model': m} for m in rcp85_models]
    + [{'rcp': 'rcp45', 'model': m} for m in rcp45_models])

There's not a good way to do this right now with collapse_product

get rid of lambdas in xarray utils

lambda functions in xarray utils (e.g. dataarray_from_futures) are difficult to read, understand, and debug. make them real functions, Mike!

enforce isort and black in tests

In order to keep things clean, we could add the following commands to our tests

isort --profile black -c .
black --check .

isort just makes imports slightly easier to follow (orders them by type of package and alphabetically, splits them into groups, and deletes duplicate imports). Not a big deal but why not make things a little cleaner.

add alternative combining methods to rct.xarray.*_from_delayed

Paraphrased from @delgadom in #84:

It would be cool to combine our *_from_delayed rhg_compute_tools.xarray functions with the new xarray combine functions so you could combine based on coords or auto-combine. or just drop the dataarray_ and dataset_ from delayed functions and just provide dataarrays_ and datasets_ functions and point the users to these concat functions.

Workflow would just be:

futures = [ ... ] # flat list of dataarray futures with arbitrary non-overlapping coordinate relationships
da = xr.combine_by_coords(rhgx.dataarrays_from_delayed(futures))

futures = [[...], [...], ...] # nested list of datarrays with hierarchical structures
da = xr.combine_nested(rhgx.dataarrays_from_delayed(futures))

or even, if you want terrible performance and just don't care...

futures = [ ... ] # ordered flat list of dataarray futures with overlapping coordinate relationships
da = functools.reduce(lambda x, y: x.combine_first(y), rhgx.dataarrays_from_delayed(futures))

Drop pytest-runner use

From https://pypi.org/project/pytest-runner/:

pytest-runner depends on deprecated features of setuptools and relies on features that break security mechanisms in pip. For example ‘setup_requires’ and ‘tests_require’ bypass pip --require-hashes. See also pypa/setuptools#1684.

It is recommended that you:

  • Remove ‘pytest-runner’ from your ‘setup_requires’, preferably removing the setup_requires option.
  • Remove ‘pytest’ and any other testing requirements from ‘tests_require’, preferably removing the setup_requires option.
  • Select a tool to bootstrap and then run tests such as tox

I don't think dropping this would radically change the package, or how tests are run. It's just a matter of carefully removing it and everything created for it.

allow for flags in cp_to_gcs

for instance -P which allows for persisting the modified times as gsutil is copying files. This matters for geoclaw plotting tools, annoyingly. I'll submit a PR for this.

Add command line gcs tools

would be great to do something along the lines of

rhg-compute-tools gcs rsync src dst

Which suggests we probably need a broader command line strategy for handling different submodules

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.