dagster-io / dagster Goto Github PK
View Code? Open in Web Editor NEWAn orchestration platform for the development, production, and observation of data assets.
Home Page: https://dagster.io
License: Apache License 2.0
An orchestration platform for the development, production, and observation of data assets.
Home Page: https://dagster.io
License: Apache License 2.0
Had to fix a relatively obvious bug. Let's get this under test.
Right now because of the first architecture the pandas dataframe_solid has a single source which is called "CSVORPARQUET" and takes a format argument. This should be separated into two distinct sources, one Csv, one Parquet
CSV
is a materialization type.Right now in dagster there is the json-file-handler branch. This has been developed in tandem against the add-logging-to-json-file branch in the superconductive repo. That branch works as is, but I would like the implementation in json-file-handler to be higher quality before merging back into develop.
I propose
dagster <noun> <verb> <target>
e.g.
dagster pipeline graphviz pipeline_name
dagster pipeline execute pipeline_name
dagster pipeline list-all
etc
Right now in the clarify repo we have a single expectation that evaluates the entire set of expectations in a ge json file. What we really want to do is expose each individual expectation in that file as its own ExpectationDefinition, so that it is inspectable by the metadata systems and configurable by policy, etc.
@Aylr had an excellent suggestion: being able to refresh the dag layout without forcing a dagit restart. I think a refresh button in the UI itself would work fine but we could also try to do something more clever.
But leave stuff like the python path to be user-specific
It'd be great to be able to
requirements.txt
files.I don't know what needs to be done vis a vis organizations, ownership, etc. to make that possible.
This causes an entire rash of lints errors because it collides with the pipeline variable name used throughout the file. Python is annoying and doesn't have a namespace for variables versus functions so we have to deal with this crap all the time. This is compounded by the fact that libraries like click love to automatically name things based on function/arg names etc. So often you have to workaround this with odd code.
This just needs to go. We should drive everything from the main cli
I commented out dagster execute as it currently stands.
We should drive it from environment files as proposed at https://github.com/superconductive/clarify-data-warehouse-201803/tree/hello_dagster_introduce_command_line/clarify_kickoff/clarify_kickoff
Copying and pasting here:
The next obvious thing to do will be be able to execute the pipeline. We'll introduce a yaml config file in order to do this. Each yaml file will represent an environment. An environment is effectively the set of all parameters into the pipeline. This way you can have an environment for testing, staging, production, and so forth. Each enviroment lives in its own file. (We could also
support multiple enviroments per file. Open to feedback but there are tradeoffs there)
Say there is a file: lds_backfill_prod.yml
environment:
inputs:
- name: s3
args:
bucket: some_bucket
access_key: some_key
secret_key: some_secret_key
There will be an execution command of the form:
dagster pipeline execute lds_backfill --environment=lds_backfill_prod.yml
You'll also be able to do fun stuff like execute subsets of the DAG.
dagster pipeline execute lds_backfill --environment=lds_backfill_prod.yml --from upload_files_to_s3
The above command would execute all solids from upload_files_to_s3 and onward. So upload_files_to_s3 --> create_lds_table_solid --> copy_data_to_lds_table --> populate_abstract_from_lds_table in this example (as of 06/04/2018)
The command line tooling in dagster is a mess. This should be consolidated into a single, sensible command that provides a lot of value.
Initially I built this that you treated a file that contained the definition of a pipeline as a command line tool. This was done by conditionally invoking the pipeline if it was main. This was a mistake. We want to be able to invoke a pipeline from a traditional feeling command line tool.
embedded_cli.py is the code that drives this so-called "embedded" cli.
To see an example of this:
run
python3 pipelines.py in dagster/dagster_examples
That is the multi-pipeline case.
Or, for example
python3 pipeline.py in dagster/dagster_examples/qhp for example
That is the single-pipeline case
This should be sub-divided into a number of tasks:
We need to enable multiple instances of an output. This will allow a "streaming batch" semantic in the pipeline that spawns multiple copies of a downstream DAG. There are a lot of challenges to solve here as you would also want a way to coalesce multiple outstanding subdags. This is essentially map-reduce.
At minimum, contexts.
https://github.com/AoiKuiyuyou/AoikLiveReload/blob/master/src/aoiklivereload/aoiklivereload.py
We only do reloading on per-module (file) granularity. That is not enough because any other code will not be reloaded.
This was always just a concession to the odd embedded_cli construct. Since we are getting rid the embedded cli, let's get rid of the this. Although @freiksenet impressed you found make_pass_decorator in click. Very clever.
I didn't realize that your branch broke so many tests before I merged. :-/ Please fix first thing.
Set up CI/CD pipeline. Decide on Travis/Circle/etc and then implement
dagster.check.CheckError: Invariant failed. Description: dep must exist got: <<your_missing_solid>> and set {'<<solids>>'}
Now that we have implemented multiple outputs we can actually implement branching. However the system does not support this yet and fatals if you do not emit all the outputs that a solid specifies. This branch contains a repro test case that is failing.
https://github.com/dagster-io/dagster/tree/handle-branching-in-multiple-outputs
This requires a non-trivial change to the core execution pipeline.
File "../../bin/dagster_driver.py", line 9, in <module>
dagster.cli.dagster_cli(obj={})
File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 722, in __call__
return self.main(*args, **kwargs)
File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 697, in main
rv = self.invoke(ctx)
File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 895, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 535, in invoke
return callback(*args, **kwargs)
File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/decorators.py", line 64, in new_func
return ctx.invoke(f, obj, *args[1:], **kwargs)
File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 535, in invoke
return callback(*args, **kwargs)
File "/home/freiksenet/Work/elemental/dagster/dagster/cli/__init__.py", line 108, in list_command
pipeline_configs = config.create_pipelines()
File "/home/freiksenet/Work/elemental/dagster/dagster/cli/__init__.py", line 52, in create_pipelines
pipeline_config.create_pipeline()
File "/home/freiksenet/Work/elemental/dagster/dagster/cli/__init__.py", line 79, in create_pipeline
self.pipeline = self.fn()
File "/home/freiksenet/Work/elemental/dagster/dagster/dagster_examples/sql_hello_world/pipeline.py", line 32, in define_pipeline
sum_table_solid = dagster_sa.create_sql_solid(
AttributeError: module 'dagster.sqlalchemy_kernel' has no attribute 'create_sql_solid'
Need to support different versions of python
We need to do a pass on naming definitions. At minimum inputs and outputs should consistently by input_defs and output_defs. This is mostly because python is dumb and has "input" as a keyword which no one ever uses. So you are either stuff with input_
or input_def
or similar. Probably all of these things (solids, arguments, contexts) should be suffixed with def by default but I do still react negatively to that.
We have avoided using "big" python 3 only features -- such as type hints and async await. However, it is not tested against 2.7 and there are more trivial things such as f-strings, and required keyword arguments.
We should set up tox to test against 2.7 and make this happen.
The command line utility already does this, but the descriptions coming out of graphql should probably be dedented and wrapped to some degree. The python textwrap.TextWrapper is relevant here
Named tuples would be much clearer and than using bare dictionaries.
While the way I implemented it is broken, I think we do need lazy loading for pipelines in cli.
If you have several pipelines and one of them is broken, it should still be possible to list pipelines and to run the non-broken ones. Currently it's not possible, because we force all pipelines to get the name. Should the metadata like name be available in a way that you can get it without actually creating the pipeline?
In addition, I am prematurely optimizing expecting some pipelines to load slowly.
This will likely involve introducing ArgumentDefinitions and also a more formal type system (with non-null just like GraphQL)
Driven by the need to make log_level optional for the default context args.
Placeholder task. We don't have a good way to evaluate the results of an expectation atm.
Panning and zooming around dagit is a pretty awful user experience at the moment. It is too sensitive to movement and the diagram moves around as you zoom in. Too frequently it zooms off the screen entirely. This does need to be perfect but it needs some tweaking in the short term. People are using this to debug real things now and it is a real usability issue.
Error messages
Usability issues
Focus on the python apis
The existing result API that is returned from execute_pipeline is no longer usable at all. We need a new one which is aware of the fact that each solid can emit multiple results.
Right now the system mandates that the signature of the transform function matches the input definitions. However the error messages suck and only kick in at runtime. We could do a much nicer job and detect the problem upfront and raise the error at definition construction time using reflection.
For inputs that are dependencies, right now the APIs strongly encourage/enforce that the input name and the solid name should be the same. That's a mistake, I believe. It can cause for very awkward names for transform function arguments. More importantly, someone changing the name of the solid would break any solid that depends on that name.
Best example is dep_only_input
. This should take a name.
We also might want to consider an API that instead passes in a dictionary of input specifications where the name is the key. This would enforce uniqueness, etc.
Right now we can "inject" solids into a sub_pipelne using PipelineDefinition.create_single_solid_pipeline
and PipelineDefinition.create_sub_pipeline
.
However we want to be able to do this with via config. In order to do this we will need to be able construct solids with only a string as a key. This will necessitate the ability to register solids and look them up by string and parameterize them. You can think of these as "higher order solids" in a way. The libraries need to declare a way to create solids, and then config can be set up to invoke those higher order solids to produce solids. This sounds complicate but when seen in practice it will be quite intuitive.
We want to be able specify something like
injected_solids:
solid_name:
input_name:
solid: load_pandas_df_from_csv
args:
delimiter: "|"
This config would dynamically create a solid which is injected into the pipeline.
Logging is still a bit of a mess. The "colored logs" module installs a logger that is global, I believe, so that it persists between different execution context creations. This can cause a lot of confusion (for me at least).
It would be fantastic to have a new general model for dealing with python's BS, global-state-polluting, java-naming-conventioned, logging API
I'd like to switch back to the old form of building the command line interface. How the main cli is getting constructed is a little obtuse at this point. I strongly prefer code such as:
dagster_command_group = click.Group(name='dagster')
dagster_command_group.add_command(embedded_dagster_multi_pipeline_graphviz_command)
dagster_command_group.add_command(embedded_dagster_multi_pipeline_pipelines_command)
dagster_command_group.add_command(embedded_dagster_multi_pipeline_output_command)
dagster_command_group.add_command(embedded_dagster_multi_pipeline_execute_command)
Although inelegant, it is super obvious what is going on and I can immediately jump to the command in question by finding the symbol within vscode by clicking on it. Let's just be dead obvious here.
To test --from
parameter, I've added an intermediate input to one of the solids in pandas_hello_world
as follows:
def sum_transform_fn(num_csv):
sum_df = num_csv.copy()
sum_df['sum'] = sum_df['num1'] + sum_df['num2']
return sum_df
sum_solid = dagster_pd.dataframe_solid(
name='sum', inputs=[dagster_pd.csv_input('num_csv')], transform_fn=sum_transform_fn
)
def sum_sq_transform_fn(sum):
sum_sq = sum.copy()
sum_sq['sum_sq'] = sum['sum']**2
return sum_sq
sum_sq_solid = dagster_pd.dataframe_solid(
name='sum_sq',
inputs=[
dagster_pd.csv_input('sum_csv'),
dagster_pd.depends_on(sum_solid),
],
transform_fn=sum_sq_transform_fn
)
When I try to run it, it complains about one of the inputs not being defined. I'm not sure that's an expected behavior, because I though only one input is needed per solid. I'm not sure how to run intermediate solids if one can't specify the results of intermediate computations as alternative input.
No nitpicking on formatting. Just use a tool and be done with it.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.