square / bionic Goto Github PK
View Code? Open in Web Editor NEWA Python framework for data science.
Home Page: https://bionic.readthedocs.io
License: Apache License 2.0
A Python framework for data science.
Home Page: https://bionic.readthedocs.io
License: Apache License 2.0
(This is a bit of an edge case, since it requires switching versioning modes. The only time I've encountered it was while trying to prepare a demo of the different versioning modes. But it'd be nice to figure out if there's an easy fix.)
Assisted versioning mode is intended to catch user errors by detecting when the user has updated a function's code but not its version. However, it can't do this for artifacts that were computed in the manual versioning mode. Here's an example as a failing test:
def test_indirect_versioning_mode_transition(builder):
builder.assign('x', 2)
@builder
def y():
return 3
@builder
def f(x, y):
return x + y
assert builder.build().get('f') == 5
@builder # noqa: F811
def y():
return 4
# Since we're using manual versioning, it makes sense that we get a
# stale value here.
assert builder.build().get('f') == 5
builder.set('core__versioning_mode', 'assist')
# Ideally we'd get an error here, but we don't (the test fails).
with pytest.raises(CodeVersioningError):
builder.build().get('f')
This is because we (1) load f
from the cache, (2) don't notice that the bytecode has changed (because we're in manual mode), (3) update the cache to indicate that f
's value exactly satisfied what we wanted, which (4) which prevents any future bytecode checks for particular combination of artifact+code. I'm not 100% sure, but it may be possible to skip step (3) in manual mode, which should fix this.
Currently Bionic's internals use a concept of a "provider" object which contains the logic for computing each entity value. This class (or family of classes) handles two categories of things:
The fact that a provider is not 1:1 with an entity suggests that we should separate these two concepts. In fact, as we add non-entity descriptors, we'll soon want to apply transformations that aren't associated with any entity at all. (For example, constructing or deconstructing a tuple.)
Furthermore, providers are structured in such a way that they can be incrementally transformed by decorator functions. This means that during the decoration process, a provider can have an incomplete or invalid configuration, with the expectation that it will be cleaned up or rejected by the final decorator prior to being added to a Flow. This need to support both clean and unclean states makes the provider interface and data model even more complicated.
To address this two issues, I propose splitting out two additional classes
EntityDefinition
class which acts as the source of truth for entity properties. This object is immutable and always in a clean, valid state.DecorationAccumulator
class which holds the messy entity configuration data built up by Bionic decorators. When a decorated function is incorporated into a flow, this accumulator will be validated, normalized, and converted into a provider and one or more EntityDefinition
s.The provider concept will remain, but it will only be responsible for computing values by applying transformations.
Entities should have docstrings which are visible to users of the flow object. If an entity is defined by a function, it should inherit that function's docstring; otherwise the docstring should probably be settable when the entity is declared or assigned.
When we change Bionic's caching code, we run the risk of making backwards-incompatible changes, creating a situation where the current version of Bionic can't load cached data from older versions. A recent example of this is the bug fixed here. Our current tests can't catch these errors because they both save and load data using the latest code.
One way to catch these errors automatically is to write a set of tests that attempt to load data from a fixed, checked-in set of cached files. This will force us to explicitly update these files when we make a non-backwards-compatible change. (If we do make such a change, we should bump the CACHE_SCHEMA_VERSION
constant -- to avoid trying to load the old files at all -- and the minor version number.)
In the documentation here: https://bionic.readthedocs.io/en/stable/concepts.html#defining-multiple-outputs-using-decorators, it should be mentioned that functions which contain multiple outputs (i.e. which use @bn.outputs
) such as split_name
cannot be called explicitly i.e. flow.get.split_name()
is not allowed. If one does this, it leads to an error: bionic.exception.UndefinedEntityError: "Entity 'split_name' is not defined"
. A consequence of this is that such functions cannot be at the end of the pipeline (they cannot be terminal functions), they need to be intermediate ones in the pipeline.
Right now, Bionic users can see the progress of their computation by watching the printed logs. However, the stuff we want to log isn't exactly the same as the stuff users want to see. For logs it's good to be verbose, to make debugging and performance checking easier โ e.g., if a task might take a long time, it's good to log the exact start and end times โ but that produces a lot of log output that doesn't need to be shown in real time.
Also, for interactive use we don't have to restrict ourselves to append-only text. On a terminal we can update text in place to produce progress meters, etc. In a notebook we have even more options. tqdm might be an interesting place to start here.
Bionic's file cache grows continuously over time, and it's difficult to manually remove older files. We should have a facility for automating this. Presumably it would detect either files that aren't needed for the current flow, or files that haven't been accessed recently.
Sometimes GCS file uploads (and presumably downloads) can time out (stack trace attached below). For most of these operations we use the GCS Python API rather than gsutil, so it's probably not retrying by default. We should probably add some retry logic to reduce the chance of transient failures crashing the whole process.
File "/usr/local/lib/python3.7/site-packages/bionic/cache.py", line 284, in _blob_from_file
self._cloud.upload(file_path, blob_url)
File "/usr/local/lib/python3.7/site-packages/bionic/cache.py", line 601, in upload
self._tool.blob_from_url(url).upload_from_filename(str(path))
File "/usr/local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1320, in upload_from_filename
predefined_acl=predefined_acl,
File "/usr/local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1265, in upload_from_file
client, file_obj, content_type, size, num_retries, predefined_acl
File "/usr/local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1175, in _do_upload
client, stream, content_type, size, num_retries, predefined_acl
File "/usr/local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1122, in _do_resumable_upload
response = upload.transmit_next_chunk(transport)
File "/usr/local/lib/python3.7/site-packages/google/resumable_media/requests/upload.py", line 425, in transmit_next_chunk
retry_strategy=self._retry_strategy,
File "/usr/local/lib/python3.7/site-packages/google/resumable_media/requests/_helpers.py", line 136, in http_request
return _helpers.wait_and_retry(func, RequestsMixin._get_status_code, retry_strategy)
File "/usr/local/lib/python3.7/site-packages/google/resumable_media/_helpers.py", line 150, in wait_and_retry
response = func()
File "/usr/local/lib/python3.7/site-packages/google/auth/transport/requests.py", line 287, in request
**kwargs
File "/usr/local/lib/python3.7/site-packages/google/auth/transport/requests.py", line 110, in __exit__
raise self._timeout_error_type()
requests.exceptions.Timeout
I have experienced quite some memory usage when running a flow. Instead of disabling all in-memory caching, is there an easy way to monitor the memory usage for each flow component?
Here's how to reproduce:
builder = bn.FlowBuilder('spaces')
builder.set('core__persistent_cache__global_dir', 'dir with spaces')
@builder
def x():
return 7
flow = builder.build()
flow.get('x')
This returns an exception like this:
InvalidCacheStateError: Cached data may be in an invalid state; this should be impossible but could have resulted from either a bug or a change to the cached files. You should be able to repair the problem by removing all cached files under ('file:///Users/janek/Development/bionic/dir%20with%20spaces/spaces/inventory',).
I suspect this is a bug in the conversion between paths and URLs. It's also suspicious that the path is being reported as a tuple, but I'm not sure if that's a symptom or just a bug in our reporting. I will investigate.
Here's a failing test:
def test_non_persisted_intermediate(builder):
builder.assign('x', 2)
@builder
@bn.persist(False)
@count_calls
def x_plus_one(x):
return x + 1
@builder
@count_calls
def x_plus_two(x_plus_one):
return x_plus_one + 1
flow = builder.build()
assert flow.get('x_plus_two') == 4
assert x_plus_one.times_called() == 1
assert x_plus_two.times_called() == 1
flow = builder.build()
assert flow.get('x_plus_two') == 4
# This should return 0, because we don't need to compute x_plus_one;
# we should just load x_plus_two from disk.
# However, it returns 1!
assert x_plus_one.times_called() == 0
This happens because when we request x_plus_two
, Bionic tries to "complete" all the upstream entities, and for a non-persistable entity that means computing it. However, in this case all we need is the hashed value of x_plus_one
, which we can figure out without actually computing it. The solution is probably to have two different levels of "completeness": one where just the hash is available, and one where the full result is available.
When serializing a dataframe with duplicate column names, Bionic produces a cryptic error message:
import pandas as pd
import bionic as bn
builder = bn.FlowBuilder('fail')
@builder
def frame():
return pd.DataFrame(columns=['a', 'b', 'a'], data=[[1,2,3]])
print(builder.build().get('frame'))
...
File "/Users/janek/Development/bionic/bionic/protocols.py", line 207, in <listcomp>
if df[col].dtype.name == 'category'
...
AttributeError: 'DataFrame' object has no attribute 'dtype'
This is because df[col]
is returning a dataframe (not a column as expected). We should fix this to either serialize the frame correctly or generate a more helpful error message.
It would be nice to be able to disable all persistent caching for an existing flow. This can be useful either to save the time of saving/loading files, or to avoid creating garbage files that need to be cleaned up.
Maybe it would look something like this:
flow.setting('core__persistent_cache__enabled', False).get(...)
Currently the Flow.render_dag()
method returns a PNG image, which doesn't support tooltips or any other kind of interaction. However, it would be fairly straightforward to return an SVG image instead, in which case we could include tooltips for each node in the DAG. These tooltips would be a good place to show the docstring of the corresponding entity (once docstring integration is complete).
This seems to happen consistently on our CI builds on Python 3.8 (but not earlier versions). Pytest logs an exception message but no tests fail.
Example: https://github.com/square/bionic/pull/149/checks?check_run_id=754300474
Right now, Bionic entity functions can only accept and return in-memory objects. However, in some cases we want to operate directly on files. (For example, we may want to call out to command line tools.) We can sort of replicate this by generating files and passing around paths, but this doesn't integrate with our serialization and caching systems. It would be better if we could indicate that a function's input and/or output correspond to the serialized version of an entity. The second example of @bn.arg
in #46 is a good example of how this might work.
When Bionic fails to save or load a cached value, the resulting stack trace is confusing for users because it doesn't indicate (a) that the problem is with the serialization protocol or (b) which entity was responsible. We should catch these exceptions and wrap them with a more helpful message.
Currently you can install Bionic via Pip, but not Conda. But some people want to use Conda!
In some ways Conda would be nicer because it could also grab dependencies like GraphViz. But we should probably just support both.
Currently, when an entity has both persistence and memoization disabled, we store its value in memory for the duration the Flow.get()
call (as documented here). I think we should change this behavior: instead of storing it in memory, we should recompute the value each time it's used.
I believe the current behavior was chosen because it was easier to implement. However, it seems more intuitive to me that if memoization is disabled, then we don't store the value in memory any longer than necessary. This also matches the intuition of data scientists I've spoken with. (If you're at Square, you can search Slack for the string "it sounds like everyone here would prefer to have it recompute".) Finally, it's slightly confusing that turning off persistence will change the amount of memoization.
This would be a breaking change. Although I doubt many users depend on the current behavior, we should make sure to check in with users before finalizing the change.
When an already-existing entity is defined with a function, its previous declaration (including any protocol, docstring, etc.) is erased. This is intended behavior, but it can be surprising -- especially since it's only mentioned in passing in the docs. (It's also mentioned in the docstring of FlowBuilder.__call__
, but that doesn't show up anywhere in our documentation.)
Here's a minimal test that a new user might expect to pass, but doesn't:
def test_declared_protocol_is_kept(builder):
protocol = bn.protocol.dillable()
builder.declare("n", protocol=protocol)
@builder
def n():
return 0
assert builder.build().entity_protocol("n") == protocol
There are a few ways we could improve this:
assign
and set
. This would hopefully make it more obvious that the new definition completely erases the previous one.After computing any entity value, Bionic serializes the value and then hashes it to check if any downstream entities need to be recomputed. However, if the value is a Python set, this hash is not deterministic: we serialize the set by pickling it, which amounts to just iterating through the set and pickling each value, and iteration order on sets is non-deterministic.
We should probably have a special protocol for sets (and frozensets) which does something deterministic, if that's possible. (I'm not sure if it's always possible, since some sets can't be sorted.)
Running the following fails for me:
pytest --slow --all-execution-modes
After a certain point, one of the worker processes crashes, and then all subsequent parallel tests fail. I think the root error is this:
libc++abi.dylib: terminating with uncaught exception of type std::runtime_error: Couldn't close file
This happens on master
, and as far back as v0.8.0
. This doesn't happen on CI, obviously, just my laptop.
I suspect we have the same problem as this one, which suggests the problem is an interaction between matplotlib, multiprocessing, and Catalina. If this is the case, we might be able to work around the issue by not running any matplotlib-using tasks in subprocesses on OS X; but I'm not certain that will work, and it's certainly not ideal.
Currently, an entity's value is only recomputed if its inputs change or its code changes. However, sometimes people want entities to be recomputed whenever some amount of time has passed. For example, we may want to download new data from a database each day. Right now the best workaround is like this:
builder.assign('date', datetime.today().strftime('%Y-%m-%d'))
@builder
@builder
def current_data(date):
return download_data()
This is a bit hacky and doesn't work if the flow is long-lived, because date
only gets updated when the flow is defined.
Note that the following does not work:
@builder
@bn.persist(False)
@builder
def current_data():
return download_data()
@builder
def summary(current_data):
return summarize(current_data)
This fails because if we call flow.get('summary')
and it finds a cached value, it will return it because it doesn't know that current_data
ought to be recomputed. We need to invalidate not just current_data
, but all downstream entities as well.
A better approach would be a way to tell Bionic that (a) an entity should be recomputed every time, and (b) downstream entities should only be recomputed if this entity's output changes. Maybe something like:
@builder
@bn.volatile # Always recompute
@bn.digestible # Hash the output to determine whether to recompute children
def date():
return datetime.today().strftime('%Y-%m-%d')
@builder
def current_data(date):
return download_data()
I'm not sure about the names, or about needing two separate decorators (although I do think they would be useful independently).
Entities marked with @changes_per_run
generate a new cache entry on disk each time they're computed, even though these cached entries are unlikely to ever be used again. We should write those values to a temporary directory instead.
Entity values in Bionic can be accessed in two ways: from another entity definition, or directly from a flow with Flow.get
. Currently these two APIs offer different ways to transform and aggregate the values.
When accessing entity values from the definition of another entity, there's a powerful @gather
decorator that joins many values of many entities into a single dataframe. When using Flow.get
, one can only aggregate the values of a single entity. However, get
provides other options for accessing serialized values, which aren't available from within entity definitions.
These discrepancies make certain constructions impossible, and they also make the system harder to learn. In particular, new users often find it difficult to understand the @gather
decorator -- I believe this is partly because there's no way to access gathered values directly from a flow, which makes it hard to experiment.
We should move towards a unified API where the same functionality is available in both circumstances. Perhaps something like these:
@builder
@bn.arg('gather_df', ['hyperparams', 'model_type'], also='performance', collection='frame')
def best_performance(gather_df):
...
@builder
@bn.arg('big_dataframe_filename', 'big_dataframe', mode='filename'):
def big_dataframe_filesize(big_dataframe_filename):
return os.stat(big_dataframe_filename).st_size
flow.get(['hyperparams', 'model_type'], also='performance', collection='frame')
Currently the helper function bionic.util.init_basic_logging
configures the global logging state, with a default level of INFO
for all loggers. However, I suspect most users only use this because they want to see Bionic logs. Maybe this function should only set the log level for Bionic loggers.
Right now, when you call Flow.reloading()
, you get a new copy of the flow with reloaded state. I think it should also mutate the flow to use the new state.
The current behavior is consistent with the other flow methods, which also return new copies rather than modifying the original flow. However, in this case I think the user will pretty much always want the new state, and I often get caught using an out-of-date version of the flow by accident. Since reloading is already a global, magical operation, I'm okay with being a little inconsistent here.
If a flow span multiple files (e.g. a flow merges another flow from a different module), reloading may not work correctly.
Possible issues
We may need to implement recursive module reloading.
Currently, Bionic runs all tasks locally, on the same host it was started on. Ideally it would be able to dispatch jobs to other hosts. (Presumably this would depend on some kind of existing cluster system like Dask.) This would allow flows to exploit very high levels of parallelization, and also enable some workloads that require too much memory for a single machine.
We would almost certainly want to implement #43 before this.
In general, almost any exception while reading a cached file will currently be reported as an InvalidCacheStateError
, which means the user will be told the cache is corrupted and needs to be cleared. However, this includes things like like GCS timing out while copying a local cached file into a bucket (like here), which are not cache corruption problems at all. We may need to be more picky about which exceptions we report this way.
Parallel evaluation of a flow doesn't seem to work when core__persist_by_default = False
, even if all entities in the flow (in particular, the ones on the parallel branches of the dag) are manually persisted (@persist(True)
). FWIW I am getting the same result with both implicit vs explicit parallel structure of the flow (e.g. setting multiplicity with values=
vs explicitly separately defining entities for each branch).
Minimal example below (with the following dag):
import bionic as bn
import logging
import time
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
builder = bn.FlowBuilder('my_flow')
builder.set('core__parallel_execution__enabled', True)
builder.set("core__persist_by_default", False)
builder.assign('greeting', 'Hello')
builder.assign('subject', 'world')
@builder
@bn.persist(True)
def subject_slow(subject):
logger.info('started slow execution')
start = time.time()
time.sleep(5)
end = time.time()
logger.info(f'ended slow execution in {end - start:0.2f}s')
return subject
@builder
@bn.persist(True)
def message(greeting, subject_slow):
return f'{greeting} {subject_slow}!'
@builder
@bn.persist(True)
@bn.gather(over='subject', also='message', into='gdf')
def all_messages(gdf):
return ' '.join(gdf.message.tolist())
flow = builder.build()
flow.setting('subject', values=['Earth', 'Mars']).get('all_messages')
This flow runs sequentially even though all entities seem to be persisted. It runs in parallel as expected when core__persist_by_default=True
Could you please document in Warnings and Pitfalls what the user should be aware of, if they have mutiple processes or threads open, which are using the same flow?
Currently Bionic computes tasks one at a time. We should be able to use something like multiprocessing
to run multiple tasks in parallel (when their dependency structure allows it). This could provide a big speedup for flows using a lot of multiplicity. It's also a first step towards distributed task computation.
There shouldn't be any major architectural obstacles here, since Bionic already manages an explicit stack of tasks. However, there will probably be lots of pickling edge cases to consider.
Now that we no longer support Python 2, we can remove a bunch of code for handling compatibility.
The FlowBuilder.merge
method excepts a flow, not a builder, which seems to be a common source of surprise. What's worse, if you pass it a builder you get a cryptic error message about not having a name attribute.
We should either check the type and raise a clearer exception, or just accept a builder as well.
Currently, whenever any entity's inputs or code changes, Bionic invalidates its cached value and the values of all downstream entities. However, if after recomputing the entity, we discover that its output value hasn't changed, there's no good reason to recompute all its children too. We should hash the output value of each entity and use that to determine whether to invalidate its children. (As opposed to the current approach, which is to decide this based on a hash of the entity's inputs.)
This will save computation in some flows, and also make it practical to have entities that are recomputed every time. (In the linked issue I suggested a @digestible
decorator that indicates that we can hash an entity's output value, but now I think it will be simpler and more intuitive to hash all outputs by default, and offer an opt-out for values that can't be serialized.)
Currently the only way to create multiplicity is by setting fixed values with builder.assign
or builder.set
. This means the number of values can't depend on other values in the system. However, there are many cases where we'd like to generate a range of values dynamically.
For example, we might want to dynamically generate a parameter range:
@builder
@bn.output_sequence
def alpha(min_alpha, max_alpha, n_alpha_samples):
return np.linspace(min_alpha, max_alpha, n_alpha_samples)
This will probably require some substantial architectural changes.
Thanks for this super helpful package. I was reading the source code and want to learn from you how you design the abstraction. One of my biggest questions is:
Why is necessary for the Flow object to be immutable?
It has a flavor of functional programming.
When we run generate_test_compatibility_cache.py
to update our regression test vectors, it doesn't delete the old test vectors, but there's no point in keeping them around. It would be better if the script deleted the old test vectors.
When Bionic initializes Matplotlib, it checks for the MacOSX
backend and replaces it with TkAgg
, since MacOSX
only works when using a framework build of Python. However, if Tcl/TK is not installed -- and on Macs it's not by default -- then TkAgg
doesn't work either.
We should figure out if there's a smarter way to initialize Matplotlib, and then adjust the docs to explicitly mention any extra non-Python dependencies like Tcl/Tk.
In parallel execution mode, Bionic completes
task states in the subprocesses. This seems fine as it allows Bionic to execute different parts of the dependency graph in parallel when applicable. But for cases where a task state already has a persisted artifact, complete method will only load the entry in subprocess and mark the task state complete. This isn't very useful as we wipe out the cached entry in cache accessor when task state exits the subprocess. We are effectively spending time on IPC and loading cached inventory from disk for marking task state complete (a boolean) for such a case.
We should change this behavior and compute
task states in the subprocesses instead.
Currently you can retrieve in-memory values with Flow.get
, which can optionally return multi-valued entities as a sequence; and you can retrieve serialized files with Flow.export
, which can only return single-valued entities. We should have a way to export serialized files for multi-valued entities as well.
In the short term, I think the best way to do this is to add another parameter to get
which indicates that we should return a serialized file instead of an object. Then we can deprecate export
.
In the long term, I'd like to have a unified abstraction for retrieving entities which supports:
@gather
-type operationsFlow.get
and from entity functionsYeah, we should have done this a while ago. But now our dependencies have started breaking in Python 2 and it will be even more work to keep triaging them.
If you try to compute an entity value but one of its ancestors doesn't have a value set, the error message is not very helpful. It just looks like this: ValueError: Entity 'MY_ENTITY' has no defined values
. We should make it easier to figure out what the problem is and how to resolve it.
Probably the most straightforward solution would be to attach an explanation that describes which of the ancestors are not set.
Bionic currently caches every computed value in memory. However, when we have many large values this is impractical. We should have some kind of decorator that tells Bionic not to cache in memory (but still cache to disk, presumably).
Sometimes users pass a single argument to @bn.outputs
, expecting it to work like @bn.output
:
builder = bn.FlowBuilder('test')
@builder
@bn.outputs('value')
def _():
return 7
flow = builder.build()
flow.get('value')
This fails because @outputs
expects the decorated function to return a sequence with one value. The correct thing would be to either return the tuple 7,
, or use @bn.output
instead of @bn.outputs
. However, the error message could be more helpful:
EntityValueError: Value received for '<value>,' is not valid for TupleProtocol due to <class 'AssertionError'>: 'int' object is not iterable
The message is correct, but it would be better to state what was expected (a sequence of length 1). We could even go farther and say something like "did you mean to use @output
instead of @outputs
?". (Also, we should probably replace the <class 'AssertionError'>
with just AssertionError
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.