radix-ai / graphchain Goto Github PK
View Code? Open in Web Editor NEW⚡️ An efficient cache for the execution of dask graphs.
License: MIT License
⚡️ An efficient cache for the execution of dask graphs.
License: MIT License
When running your dask delayed decorator example, I hit this error:
Traceback (most recent call last):
File "./test3.py", line 37, in <module>
print(result.compute())
File "/nix/store/60l4c79ba9ss8hd9qpah85rh0kmgbyzh-python3.6-dask-1.1.1/lib/python3.6/site-packages/dask/base.py", line 156, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/nix/store/60l4c79ba9ss8hd9qpah85rh0kmgbyzh-python3.6-dask-1.1.1/lib/python3.6/site-packages/dask/base.py", line 395, in compute
dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
File "/nix/store/60l4c79ba9ss8hd9qpah85rh0kmgbyzh-python3.6-dask-1.1.1/lib/python3.6/site-packages/dask/base.py", line 194, in collections_to_dsk
for opt, (dsk, keys) in groups.items()]))
File "/nix/store/60l4c79ba9ss8hd9qpah85rh0kmgbyzh-python3.6-dask-1.1.1/lib/python3.6/site-packages/dask/base.py", line 194, in <listcomp>
for opt, (dsk, keys) in groups.items()]))
File "/nix/store/1fqfcjjrka11b6np7qgcwsk8ld6bcq2w-python3.6-graphchain-1.0.0/lib/python3.6/site-packages/graphchain/core.py", line 346, in optimize
dsk = dsk.copy()
AttributeError: 'HighLevelGraph' object has no attribute 'copy'
This was after I fixed the some incorrect parameter names.
The code to achieve this was:
import dask
import graphchain
import pandas as pd
@dask.delayed(pure=True)
def create_dataframe(num_rows, num_cols):
print('Creating DataFrame...')
return pd.DataFrame(data=[range(num_cols)]*num_rows)
@dask.delayed(pure=True)
def create_dataframe2(num_rows, num_cols):
print('Creating DataFrame...')
return pd.DataFrame(data=[range(num_cols)]*num_rows)
@dask.delayed(pure=True)
def complicated_computation(df, num_quantiles):
print('Running complicated computation on DataFrame...')
return df.quantile(q=[i / num_quantiles for i in range(num_quantiles)])
@dask.delayed(pure=True)
def summarise_dataframes(*dfs):
print('Summing DataFrames...')
return sum(df.sum().sum() for df in dfs)
df_a = create_dataframe(10_000, 1000)
df_b = create_dataframe2(10_000, 1000)
df_c = complicated_computation(df_a, 2048)
df_d = complicated_computation(df_b, 2048)
result = summarise_dataframes(df_c, df_d)
with dask.config.set(scheduler='sync', delayed_optimize=graphchain.optimize):
print(result.compute())
In Dask, whether you set pure to be true or not affects the way the computational graph is constructed. In fact enabling pure will allow dask to optimise prior to constructing the graph.
I've noticed that graphchain appears to assume all computational nodes are pure, and caches them. But if some nodes are not pure, they shouldn't be cached.
I have a new issue. The readme/tests only describe how to use graphchain
with the delayed API or the dask collections, but I'm wondering how I can do so if I'm using Futures, e.g. using client.submit(func)
. Are these Futures
just delayed
behind the scenes, so I can use the delayed optimizer hook? If not, is there any way to apply graphchain
to them?
It would be nice to support the MutableMapping
interface for storage as well. This would allow users to design a wrapper around their desired storage format as long as it supplies the MutableMapping
interface.
With CircleCI or Travis. Look into code coverage too if we have time.
When executing a node in a hashed graph, an option should exist that allows returning along with the result a trace of all operations performed for all dependencies to reach the result (i.e. loads, executions+store, etc.)
The simplest structure could be a {key: operation}
or {key: [operation, [operations_of_dependencies]]}
table.
ensure_dict
I just realised that the current hook that we use to insert graphchain
into dask
is delayed_optimize
, which only actually applies for "raw" Delayed
instances, but doesn't actually apply to e.g. the DataFrame
API, which is a very popular part of dask
. As described here, these types have their own optimisation hooks that we would need to implement. I will look in to writing one for the DataFrame
API. I think we should be able to integrate it fairly easily into the current codebase.
array_optimize
dataframe_optimize
bag_optimize
The values of keys are not correctly retrieved when the value is a key reference. Hashing is also broken as it does not return the hash of the refefenced key but the hash of the string itself (in the example below, graphchain.funcutils.get_hash('b', ...) != graphchain.funcutils.get_hash(1, ...)
import graphchain
dsk = {'a':1, 'b': 'a', 'c': 'b'}
graphchain.get(dsk, 'c')
# 2018-05-30 11:21:57,162 - graphchain.funcutils - INFO - Creating the cache directory ...
# 2018-05-30 11:21:57,171 - graphchain.funcutils - INFO - Creating a new hash-chain file graphchain.json
# 2018-05-30 11:21:57,183 - graphchain.funcutils - INFO - * [a] (hash=fe750c1e1b562ba7c4c20fcb345e16b8)
# 2018-05-30 11:21:57,183 - graphchain.funcutils - DEBUG - `--> HASH MISS: src=NONE, arg=NONE dep=NONE has 0 candidates.
# 2018-05-30 11:21:57,184 - graphchain.funcutils - INFO - * [b] (hash=f2cd799a97dfb522a45ba33e5c16bbac)
# 2018-05-30 11:21:57,184 - graphchain.funcutils - DEBUG - `--> HASH MISS: src= OK, arg=MISS dep= OK has 1 candidates.
# 2018-05-30 11:21:57,184 - graphchain.funcutils - INFO - * [c] (hash=4e9d3211321733b10db94cb52eabd8e1)
# 2018-05-30 11:21:57,185 - graphchain.funcutils - DEBUG - `--> HASH MISS: src= OK, arg=MISS dep= OK has 2 candidates.
# 2018-05-30 11:21:57,185 - graphchain.graphchain - DEBUG - --- GraphChain Optimization complete ---
# 2018-05-30 11:21:57,193 - graphchain.funcutils - INFO - * [key=c constant=<class 'str'>] EXEC-STORE (hash=4e9d3211321733b10db94cb52eabd8e1)
# 'b' # <-- Output
graphchain.get(dsk,'b')
# 2018-05-30 11:22:11,745 - graphchain.funcutils - INFO - * [a] (hash=fe750c1e1b562ba7c4c20fcb345e16b8)
# 2018-05-30 11:22:11,745 - graphchain.funcutils - DEBUG - `--> HASH MISS: src= OK, arg=MISS dep= OK has 1 candidates.
# 2018-05-30 11:22:11,745 - graphchain.funcutils - INFO - * [b] (hash=f2cd799a97dfb522a45ba33e5c16bbac)
# 2018-05-30 11:22:11,745 - graphchain.funcutils - DEBUG - `--> HASH MISS: src= OK, arg=MISS dep= OK has 2 candidates.
# 2018-05-30 11:22:11,770 - graphchain.graphchain - DEBUG - --- GraphChain Optimization complete ---
# 2018-05-30 11:22:11,771 - graphchain.funcutils - INFO - * [key=b constant=<class 'str'>] EXEC-STORE (hash=f2cd799a97dfb522a45ba33e5c16bbac)
# 'a' # <-- Output
Hi there!
I'm trying to evaluate graphchain for our usecase, and I'm not sure I fully understand the model. I'd appreciate help clarifying, if possible!
We have some relatively simple dask computations; backed by climate science zarr datastores, that eventually we want to compute location-specific metrics from. At any given time/compute step, only a small subset of these metrics will be necessary. Ideally, we'd like to persist and cache any subset of these metrics we've computed, but we don't really want to precompute and cache metrics for the entire world.
As I understand it: graphchain will examine the dask compute graph, identify nodes as caching opportunities, and persist them. I think this means that if a subset operation happens late in the compute graph, then dask moves that subset op back up the compute graph, and graphchain then starts by looking at the already subsetted computation graph?
Graphchain then persists this computed node; by default as a joblib
dump, but you can write custom serializers that do different things if it sees a dask dataframe. So... what happens if I use to_zarr
in a custom serializer? Are the dataframes we're talking about here already subsetted, or would calling to_zarr
on them force them and compute the entire globe's worth of metric? The latter can't be the case; to_parquet
would do the same... so does the key then include the chunk being persisted? Are those chunks the same as zarr chunks; if only by accident because our input is a zarr datastore?
I get the feeling that I'm not quite understanding something here. I'll be running some experiments of my own to try and figure out what graphchain is doing, but I figured I should just as well also ask the question!
Some tests depend on the side effects of other tests (e.g., first_run
-> second_run
). These should be decoupled so that running any test in isolation is a valid test.
Hi,
I was trying to implement this on Zarr files. I import and set the dask config use graphchains optimize-
I was trying to compute this array, b-
And I passed it to a delayed operator-
I get an error when trying to compute-
Looking at the graph layers, it looks like there is a Blockwise layer (which lacks a .mapping
method) -
Was wondering if I am doing something wrong or perhaps this type of Dask array is not supported? Any help is appreciated.
And mark optional dependencies too.
Example environment.yml:
name: graphchain-env
channels:
- defaults
dependencies:
- dask=0.17
- fs-s3fs=0.1 # Optional? Dependency on `fs` already part of this package.
- joblib=0.11
- pip:
- lz4~=1.1.0 # No recent version on anaconda.org.
In principle we should hash the function's source and its dependencies' source as well. Currently we're doing what Joblib is doing: hashing the main function's source only.
graphchain
(1.1.0) fails to work with dask.distributed
(2.21.0). An exception is raised when pickle
or cloudpickle
attempt to serialize CachedComputation
.
The following code reproduces the problem:
import graphchain
import dask.distributed
import dask.delayed
import operator
def main():
cluster = dask.distributed.LocalCluster(n_workers=2,
processes=True,
threads_per_worker=1)
d = dask.delayed(operator.add, 3, 5)
with dask.distributed.Client(address=cluster):
with dask.config.set(delayed_optimize=graphchain.optimize):
d.compute() # raises TypeError: can't pickle _thread.RLock objects
return
if __name__ == "__main__":
main()
I'm not sure if this is a bug, user error, or it is expected behaviour and graphchain
is simply not expected to work with dask.distributed
.
we could either install the patch, see blogpost
or adjust code to write in chunks see stackoverflow
So in the readme we are told to dask.config.set(scheduler="sync")
, but I guess this means that tasks can no longer run in parallel, which loses a lot of the advantages of dask.
When I remove this, and use a parallel scheduler, I'm getting cannot pickle '_thread.RLock' object
when it tries to dump the CachedComputation
object. Is there an easy workaround here like using a multiprocessing.Lock
?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.