Coder Social home page Coder Social logo

graphchain's People

Contributors

cbyrohl avatar floriango avatar gield avatar lsorber avatar multimeric avatar rcambier avatar zgornel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

graphchain's Issues

Attribute Error from dask 1.1.1

When running your dask delayed decorator example, I hit this error:

Traceback (most recent call last):
  File "./test3.py", line 37, in <module>
    print(result.compute())
  File "/nix/store/60l4c79ba9ss8hd9qpah85rh0kmgbyzh-python3.6-dask-1.1.1/lib/python3.6/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/nix/store/60l4c79ba9ss8hd9qpah85rh0kmgbyzh-python3.6-dask-1.1.1/lib/python3.6/site-packages/dask/base.py", line 395, in compute
    dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
  File "/nix/store/60l4c79ba9ss8hd9qpah85rh0kmgbyzh-python3.6-dask-1.1.1/lib/python3.6/site-packages/dask/base.py", line 194, in collections_to_dsk
    for opt, (dsk, keys) in groups.items()]))
  File "/nix/store/60l4c79ba9ss8hd9qpah85rh0kmgbyzh-python3.6-dask-1.1.1/lib/python3.6/site-packages/dask/base.py", line 194, in <listcomp>
    for opt, (dsk, keys) in groups.items()]))
  File "/nix/store/1fqfcjjrka11b6np7qgcwsk8ld6bcq2w-python3.6-graphchain-1.0.0/lib/python3.6/site-packages/graphchain/core.py", line 346, in optimize
    dsk = dsk.copy()
AttributeError: 'HighLevelGraph' object has no attribute 'copy'

This was after I fixed the some incorrect parameter names.

The code to achieve this was:

import dask
import graphchain
import pandas as pd

@dask.delayed(pure=True)
def create_dataframe(num_rows, num_cols):
    print('Creating DataFrame...')
    return pd.DataFrame(data=[range(num_cols)]*num_rows)

@dask.delayed(pure=True)
def create_dataframe2(num_rows, num_cols):
    print('Creating DataFrame...')
    return pd.DataFrame(data=[range(num_cols)]*num_rows)

@dask.delayed(pure=True)
def complicated_computation(df, num_quantiles):
    print('Running complicated computation on DataFrame...')
    return df.quantile(q=[i / num_quantiles for i in range(num_quantiles)])

@dask.delayed(pure=True)
def summarise_dataframes(*dfs):
    print('Summing DataFrames...')
    return sum(df.sum().sum() for df in dfs)

df_a = create_dataframe(10_000, 1000)
df_b = create_dataframe2(10_000, 1000)
df_c = complicated_computation(df_a, 2048)
df_d = complicated_computation(df_b, 2048)
result = summarise_dataframes(df_c, df_d)

with dask.config.set(scheduler='sync', delayed_optimize=graphchain.optimize):
    print(result.compute())

Add support for non-pure computations

In Dask, whether you set pure to be true or not affects the way the computational graph is constructed. In fact enabling pure will allow dask to optimise prior to constructing the graph.

I've noticed that graphchain appears to assume all computational nodes are pure, and caches them. But if some nodes are not pure, they shouldn't be cached.

Applying this to `Future`s

I have a new issue. The readme/tests only describe how to use graphchain with the delayed API or the dask collections, but I'm wondering how I can do so if I'm using Futures, e.g. using client.submit(func). Are these Futures just delayed behind the scenes, so I can use the delayed optimizer hook? If not, is there any way to apply graphchain to them?

Storing results to an arbitrary MutableMapping

It would be nice to support the MutableMapping interface for storage as well. This would allow users to design a wrapper around their desired storage format as long as it supplies the MutableMapping interface.

A robust trace of all operations performed to execute a node.

When executing a node in a hashed graph, an option should exist that allows returning along with the result a trace of all operations performed for all dependencies to reach the result (i.e. loads, executions+store, etc.)

The simplest structure could be a {key: operation} or {key: [operation, [operations_of_dependencies]]} table.

Fix a few smaller issues

  • Replace HLG monkey patching with dask's ensure_dict
  • Generation of API documentation fails
  • Delayed example in the README uses incorrect keyword args

Optimisation hook for array/bag/dataframe

I just realised that the current hook that we use to insert graphchain into dask is delayed_optimize, which only actually applies for "raw" Delayed instances, but doesn't actually apply to e.g. the DataFrame API, which is a very popular part of dask. As described here, these types have their own optimisation hooks that we would need to implement. I will look in to writing one for the DataFrame API. I think we should be able to integrate it fairly easily into the current codebase.

  • array_optimize
  • dataframe_optimize
  • bag_optimize

Task graph key values are not interpolated

The values of keys are not correctly retrieved when the value is a key reference. Hashing is also broken as it does not return the hash of the refefenced key but the hash of the string itself (in the example below, graphchain.funcutils.get_hash('b', ...) != graphchain.funcutils.get_hash(1, ...)

import graphchain                                                                                                                                                                                                                                                                                                                                                                                                          
dsk = {'a':1, 'b': 'a', 'c': 'b'}                                                                                                                                                         
graphchain.get(dsk, 'c')
# 2018-05-30 11:21:57,162 - graphchain.funcutils - INFO - Creating the cache directory ...
# 2018-05-30 11:21:57,171 - graphchain.funcutils - INFO - Creating a new hash-chain file graphchain.json
# 2018-05-30 11:21:57,183 - graphchain.funcutils - INFO - * [a] (hash=fe750c1e1b562ba7c4c20fcb345e16b8)
# 2018-05-30 11:21:57,183 - graphchain.funcutils - DEBUG - `--> HASH MISS: src=NONE, arg=NONE dep=NONE has 0 candidates.
# 2018-05-30 11:21:57,184 - graphchain.funcutils - INFO - * [b] (hash=f2cd799a97dfb522a45ba33e5c16bbac)                                                                                                              
# 2018-05-30 11:21:57,184 - graphchain.funcutils - DEBUG - `--> HASH MISS: src=  OK, arg=MISS dep=  OK has 1 candidates.                                                                                             
# 2018-05-30 11:21:57,184 - graphchain.funcutils - INFO - * [c] (hash=4e9d3211321733b10db94cb52eabd8e1)                                                                                                              
# 2018-05-30 11:21:57,185 - graphchain.funcutils - DEBUG - `--> HASH MISS: src=  OK, arg=MISS dep=  OK has 2 candidates.                                                                                             
# 2018-05-30 11:21:57,185 - graphchain.graphchain - DEBUG - --- GraphChain Optimization complete ---                                                                                                                 
# 2018-05-30 11:21:57,193 - graphchain.funcutils - INFO - * [key=c constant=<class 'str'>] EXEC-STORE (hash=4e9d3211321733b10db94cb52eabd8e1)                                                                        
# 'b'  # <-- Output                                                                                                                                                                                                       
                                                                                                                                                                                                                   
graphchain.get(dsk,'b')                                                                                                                                                                                   
# 2018-05-30 11:22:11,745 - graphchain.funcutils - INFO - * [a] (hash=fe750c1e1b562ba7c4c20fcb345e16b8)                                                                                                              
# 2018-05-30 11:22:11,745 - graphchain.funcutils - DEBUG - `--> HASH MISS: src=  OK, arg=MISS dep=  OK has 1 candidates.                                                                                             
# 2018-05-30 11:22:11,745 - graphchain.funcutils - INFO - * [b] (hash=f2cd799a97dfb522a45ba33e5c16bbac)                                                                                                              
# 2018-05-30 11:22:11,745 - graphchain.funcutils - DEBUG - `--> HASH MISS: src=  OK, arg=MISS dep=  OK has 2 candidates.                                                                                             
# 2018-05-30 11:22:11,770 - graphchain.graphchain - DEBUG - --- GraphChain Optimization complete ---                                                                                                                 
# 2018-05-30 11:22:11,771 - graphchain.funcutils - INFO - * [key=b constant=<class 'str'>] EXEC-STORE (hash=f2cd799a97dfb522a45ba33e5c16bbac)                                                                        
#  'a'  # <-- Output                                                                                                                              

Does graphchain make sense for persisting partial computations of xarrays as zarr?

Hi there!

I'm trying to evaluate graphchain for our usecase, and I'm not sure I fully understand the model. I'd appreciate help clarifying, if possible!

We have some relatively simple dask computations; backed by climate science zarr datastores, that eventually we want to compute location-specific metrics from. At any given time/compute step, only a small subset of these metrics will be necessary. Ideally, we'd like to persist and cache any subset of these metrics we've computed, but we don't really want to precompute and cache metrics for the entire world.

As I understand it: graphchain will examine the dask compute graph, identify nodes as caching opportunities, and persist them. I think this means that if a subset operation happens late in the compute graph, then dask moves that subset op back up the compute graph, and graphchain then starts by looking at the already subsetted computation graph?

Graphchain then persists this computed node; by default as a joblib dump, but you can write custom serializers that do different things if it sees a dask dataframe. So... what happens if I use to_zarr in a custom serializer? Are the dataframes we're talking about here already subsetted, or would calling to_zarr on them force them and compute the entire globe's worth of metric? The latter can't be the case; to_parquet would do the same... so does the key then include the chunk being persisted? Are those chunks the same as zarr chunks; if only by accident because our input is a zarr datastore?

I get the feeling that I'm not quite understanding something here. I'll be running some experiments of my own to try and figure out what graphchain is doing, but I figured I should just as well also ask the question!

Decouple unit tests

Some tests depend on the side effects of other tests (e.g., first_run -> second_run). These should be decoupled so that running any test in isolation is a valid test.

AttributeError: 'Blockwise' object has no attribute 'mapping'

Hi,
I was trying to implement this on Zarr files. I import and set the dask config use graphchains optimize-
image

I was trying to compute this array, b-
image

And I passed it to a delayed operator-
image

I get an error when trying to compute-
image

Looking at the graph layers, it looks like there is a Blockwise layer (which lacks a .mapping method) -
image

Was wondering if I am doing something wrong or perhaps this type of Dask array is not supported? Any help is appreciated.

Add an environment.yml that specifies graphchain's dependencies

And mark optional dependencies too.

Example environment.yml:

name: graphchain-env
channels:
 - defaults
dependencies:
 - dask=0.17
 - fs-s3fs=0.1  # Optional? Dependency on `fs` already part of this package.
 - joblib=0.11
 - pip:
   - lz4~=1.1.0  # No recent version on anaconda.org.

Recursive source code retrieval

In principle we should hash the function's source and its dependencies' source as well. Currently we're doing what Joblib is doing: hashing the main function's source only.

graphchain fails with dask.distributed

graphchain (1.1.0) fails to work with dask.distributed (2.21.0). An exception is raised when pickle or cloudpickle attempt to serialize CachedComputation.

The following code reproduces the problem:

import graphchain
import dask.distributed
import dask.delayed
import operator

def main():
    cluster = dask.distributed.LocalCluster(n_workers=2,
                               processes=True,
                               threads_per_worker=1)

    d = dask.delayed(operator.add, 3, 5)

    with dask.distributed.Client(address=cluster):
        with dask.config.set(delayed_optimize=graphchain.optimize):
            d.compute() # raises TypeError: can't pickle _thread.RLock objects
    return

if __name__ == "__main__":
    main()

I'm not sure if this is a bug, user error, or it is expected behaviour and graphchain is simply not expected to work with dask.distributed.

Use with non-synchronous scheduler

So in the readme we are told to dask.config.set(scheduler="sync"), but I guess this means that tasks can no longer run in parallel, which loses a lot of the advantages of dask.

When I remove this, and use a parallel scheduler, I'm getting cannot pickle '_thread.RLock' object when it tries to dump the CachedComputation object. Is there an easy workaround here like using a multiprocessing.Lock?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.