Coder Social home page Coder Social logo

dagster-io / dagster Goto Github PK

View Code? Open in Web Editor NEW
10.2K 113.0 1.3K 955.32 MB

An orchestration platform for the development, production, and observation of data assets.

Home Page: https://dagster.io

License: Apache License 2.0

Makefile 0.02% Python 80.77% Jupyter Notebook 0.62% Shell 0.02% HTML 0.01% TypeScript 18.09% JavaScript 0.22% Dockerfile 0.07% Mako 0.01% CSS 0.01% Smarty 0.10% Mustache 0.06% Jinja 0.02%
data-pipelines dagster workflow data-science workflow-automation python scheduler data-orchestrator etl analytics

dagster's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dagster's Issues

Separate Pandas Dataframe Solid into Two Sources

Right now because of the first architecture the pandas dataframe_solid has a single source which is called "CSVORPARQUET" and takes a format argument. This should be separated into two distinct sources, one Csv, one Parquet

Materialization issues

  • As I understood, materialization definitions are implicit. So execution params specify materializations for solids. Currently drivers don't expose supported materializations, so we should have a way to extract that metadata from a driver. Otherwise it's really poorly discoverable that CSV is a materialization type.
  • I guess materialization should also be part of config/env yaml file for the pipeline. Another option is to have it be defined separately and then selectively enabled (eg by saying --materialization NAME or --materialization TAG).

Write JSON File Handler well and write tests

Right now in dagster there is the json-file-handler branch. This has been developed in tandem against the add-logging-to-json-file branch in the superconductive repo. That branch works as is, but I would like the implementation in json-file-handler to be higher quality before merging back into develop.

Create UX standards for the CLI tool

I propose

dagster <noun> <verb> <target>

e.g.

dagster pipeline graphviz pipeline_name
dagster pipeline execute pipeline_name
dagster pipeline list-all

etc

Generate multiple dagster Expectations from a great expectations json file

Right now in the clarify repo we have a single expectation that evaluates the entire set of expectations in a ge json file. What we really want to do is expose each individual expectation in that file as its own ExpectationDefinition, so that it is inspectable by the metadata systems and configurable by policy, etc.

Release the repo publicly

It'd be great to be able to

  1. point people to a public-facing repo and
  2. include dagster in requirements.txt files.

I don't know what needs to be done vis a vis organizations, ownership, etc. to make that possible.

Rename pipeline function in cli/__init__.py

This causes an entire rash of lints errors because it collides with the pipeline variable name used throughout the file. Python is annoying and doesn't have a namespace for variables versus functions so we have to deal with this crap all the time. This is compounded by the fact that libraries like click love to automatically name things based on function/arg names etc. So often you have to workaround this with odd code.

Implement dagster execute

I commented out dagster execute as it currently stands.

We should drive it from environment files as proposed at https://github.com/superconductive/clarify-data-warehouse-201803/tree/hello_dagster_introduce_command_line/clarify_kickoff/clarify_kickoff

Copying and pasting here:

Pipeline Execution and Configuration

The next obvious thing to do will be be able to execute the pipeline. We'll introduce a yaml config file in order to do this. Each yaml file will represent an environment. An environment is effectively the set of all parameters into the pipeline. This way you can have an environment for testing, staging, production, and so forth. Each enviroment lives in its own file. (We could also
support multiple enviroments per file. Open to feedback but there are tradeoffs there)

Say there is a file: lds_backfill_prod.yml

environment:
  inputs:
    - name: s3
      args:
        bucket: some_bucket 
        access_key: some_key
        secret_key: some_secret_key 

There will be an execution command of the form:

dagster pipeline execute lds_backfill --environment=lds_backfill_prod.yml

You'll also be able to do fun stuff like execute subsets of the DAG.

dagster pipeline execute lds_backfill --environment=lds_backfill_prod.yml --from upload_files_to_s3

The above command would execute all solids from upload_files_to_s3 and onward. So upload_files_to_s3 --> create_lds_table_solid --> copy_data_to_lds_table --> populate_abstract_from_lds_table in this example (as of 06/04/2018)

Consolidate and build high quality command line tooling (master task)

The command line tooling in dagster is a mess. This should be consolidated into a single, sensible command that provides a lot of value.

Initially I built this that you treated a file that contained the definition of a pipeline as a command line tool. This was done by conditionally invoking the pipeline if it was main. This was a mistake. We want to be able to invoke a pipeline from a traditional feeling command line tool.

embedded_cli.py is the code that drives this so-called "embedded" cli.

To see an example of this:

run

python3 pipelines.py in dagster/dagster_examples

That is the multi-pipeline case.

Or, for example

python3 pipeline.py in dagster/dagster_examples/qhp for example

That is the single-pipeline case

This should be sub-divided into a number of tasks:

  • File layout for making dagster pipeline tool aware of pipelines. This is essentially already implemented. dagster.cli.init.py:12-25
  • Listing pipelines and their description (also these lines, but should be improved)
  • A good way of printing out pipeline metadata. Currently print_pipeline in embedded_cli.py. It is not good. Invoked using the "meta" command in the single pipeline case.
  • Consume a yaml file that constructs the objects defined in dagster/config.py. We want to be able to drive a pipeline execution or materialization from a yaml file.
  • Be able to codegen a yaml file based on the declared definition of a pipeline.
  • Be able to verify the validity of an incoming yaml file based on a definition of a pipeline (not hi-pri)

Enabling multiple instances of an output (ends up being map reduce)

We need to enable multiple instances of an output. This will allow a "streaming batch" semantic in the pipeline that spawns multiple copies of a downstream DAG. There are a lot of challenges to solve here as you would also want a way to coalesce multiple outstanding subdags. This is essentially map-reduce.

Eliminate pass_pipeline in the cli

This was always just a concession to the odd embedded_cli construct. Since we are getting rid the embedded cli, let's get rid of the this. Although @freiksenet impressed you found make_pass_decorator in click. Very clever.

CI/CD

Set up CI/CD pipeline. Decide on Travis/Circle/etc and then implement

Make missing solid error better

STR

  1. make a solid have a dependency on a solid not listed in the pipeline instantiation.
  2. Run and watch for this:
dagster.check.CheckError: Invariant failed. Description: dep must exist got: <<your_missing_solid>> and set {'<<solids>>'}

Bug in sql_hello_world example

  File "../../bin/dagster_driver.py", line 9, in <module>
    dagster.cli.dagster_cli(obj={})
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/decorators.py", line 64, in new_func
    return ctx.invoke(f, obj, *args[1:], **kwargs)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/freiksenet/Work/elemental/dagster/dagster/cli/__init__.py", line 108, in list_command
    pipeline_configs = config.create_pipelines()
  File "/home/freiksenet/Work/elemental/dagster/dagster/cli/__init__.py", line 52, in create_pipelines
    pipeline_config.create_pipeline()
  File "/home/freiksenet/Work/elemental/dagster/dagster/cli/__init__.py", line 79, in create_pipeline
    self.pipeline = self.fn()
  File "/home/freiksenet/Work/elemental/dagster/dagster/dagster_examples/sql_hello_world/pipeline.py", line 32, in define_pipeline
    sum_table_solid = dagster_sa.create_sql_solid(
AttributeError: module 'dagster.sqlalchemy_kernel' has no attribute 'create_sql_solid'

Make naming consistent for definitions

We need to do a pass on naming definitions. At minimum inputs and outputs should consistently by input_defs and output_defs. This is mostly because python is dumb and has "input" as a keyword which no one ever uses. So you are either stuff with input_ or input_def or similar. Probably all of these things (solids, arguments, contexts) should be suffixed with def by default but I do still react negatively to that.

Make Dagster Python 2.7 & 3.5 compatible

We have avoided using "big" python 3 only features -- such as type hints and async await. However, it is not tested against 2.7 and there are more trivial things such as f-strings, and required keyword arguments.

We should set up tox to test against 2.7 and make this happen.

Lazy loading pipelines

While the way I implemented it is broken, I think we do need lazy loading for pipelines in cli.

If you have several pipelines and one of them is broken, it should still be possible to list pipelines and to run the non-broken ones. Currently it's not possible, because we force all pipelines to get the name. Should the metadata like name be available in a way that you can get it without actually creating the pipeline?

In addition, I am prematurely optimizing expecting some pipelines to load slowly.

Enable optional arguments

This will likely involve introducing ArgumentDefinitions and also a more formal type system (with non-null just like GraphQL)

Driven by the need to make log_level optional for the default context args.

Fix dagit panning and zooming

Panning and zooming around dagit is a pretty awful user experience at the moment. It is too sensitive to movement and the diagram moves around as you zoom in. Too frequently it zooms off the screen entirely. This does need to be perfect but it needs some tweaking in the short term. People are using this to debug real things now and it is a real usability issue.

New result API

The existing result API that is returned from execute_pipeline is no longer usable at all. We need a new one which is aware of the fact that each solid can emit multiple results.

Detect mismatch between input definitions and transform function signature

Right now the system mandates that the signature of the transform function matches the input definitions. However the error messages suck and only kick in at runtime. We could do a much nicer job and detect the problem upfront and raise the error at definition construction time using reflection.

Encourage the use of explicit input names when specifying dependencies

For inputs that are dependencies, right now the APIs strongly encourage/enforce that the input name and the solid name should be the same. That's a mistake, I believe. It can cause for very awkward names for transform function arguments. More importantly, someone changing the name of the solid would break any solid that depends on that name.

Best example is dep_only_input. This should take a name.

We also might want to consider an API that instead passes in a dictionary of input specifications where the name is the key. This would enforce uniqueness, etc.

Solid Registry and config-driven solid definition

Right now we can "inject" solids into a sub_pipelne using PipelineDefinition.create_single_solid_pipeline and PipelineDefinition.create_sub_pipeline.

However we want to be able to do this with via config. In order to do this we will need to be able construct solids with only a string as a key. This will necessitate the ability to register solids and look them up by string and parameterize them. You can think of these as "higher order solids" in a way. The libraries need to declare a way to create solids, and then config can be set up to invoke those higher order solids to produce solids. This sounds complicate but when seen in practice it will be quite intuitive.

We want to be able specify something like

injected_solids:
    solid_name:
        input_name:
           solid: load_pandas_df_from_csv
           args:
              delimiter: "|"

This config would dynamically create a solid which is injected into the pipeline.

Eliminate global state from logging

Logging is still a bit of a mess. The "colored logs" module installs a logger that is global, I believe, so that it persists between different execution context creations. This can cause a lot of confusion (for me at least).

It would be fantastic to have a new general model for dealing with python's BS, global-state-polluting, java-naming-conventioned, logging API

Prefer more verbose syntax for building click command line groups

I'd like to switch back to the old form of building the command line interface. How the main cli is getting constructed is a little obtuse at this point. I strongly prefer code such as:

    dagster_command_group = click.Group(name='dagster')
    dagster_command_group.add_command(embedded_dagster_multi_pipeline_graphviz_command)
    dagster_command_group.add_command(embedded_dagster_multi_pipeline_pipelines_command)
    dagster_command_group.add_command(embedded_dagster_multi_pipeline_output_command)
    dagster_command_group.add_command(embedded_dagster_multi_pipeline_execute_command)

Although inelegant, it is super obvious what is going on and I can immediately jump to the command in question by finding the symbol within vscode by clicking on it. Let's just be dead obvious here.

Solid input requirements

To test --from parameter, I've added an intermediate input to one of the solids in pandas_hello_world as follows:

    def sum_transform_fn(num_csv):
        sum_df = num_csv.copy()
        sum_df['sum'] = sum_df['num1'] + sum_df['num2']
        return sum_df

    sum_solid = dagster_pd.dataframe_solid(
        name='sum', inputs=[dagster_pd.csv_input('num_csv')], transform_fn=sum_transform_fn
    )

    def sum_sq_transform_fn(sum):
        sum_sq = sum.copy()
        sum_sq['sum_sq'] = sum['sum']**2
        return sum_sq

    sum_sq_solid = dagster_pd.dataframe_solid(
        name='sum_sq',
        inputs=[
            dagster_pd.csv_input('sum_csv'),
            dagster_pd.depends_on(sum_solid),
        ],
        transform_fn=sum_sq_transform_fn
    )

When I try to run it, it complains about one of the inputs not being defined. I'm not sure that's an expected behavior, because I though only one input is needed per solid. I'm not sure how to run intermediate solids if one can't specify the results of intermediate computations as alternative input.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.