Comments (7)
sum_sq_solid = dagster_pd.dataframe_solid(
name='sum_sq',
inputs=[
dagster_pd.depends_on(sum_solid),
],
transform_fn=sum_sq_transform_fn
)
from dagster.
sum:
source:CSVORPARQUET
args:
format: CSV
path: whatever
from dagster.
Here is summary of my proposal:
- Get rid of typed solid inputs. Inputs are either other solids or named inputs
- Named inputs have an expectation of OUTPUT format, for example dataframe or raw, but they don't have expectation of how exactly to get there
def define_pipeline():
def sum_transform_fn(num_csv):
sum_df = num_csv.copy()
sum_df['sum'] = sum_df['num1'] + sum_df['num2']
return sum_df
sum_solid = dagster_pd.dataframe_solid(
inputs=[dagster_pd.dataframe_input('num')
name='sum')
def sum_sq_transform_fn(sum):
sum_sq = sum.copy()
sum_sq['sum_sq'] = sum['sum']**2
return sum_sq
sum_sq_solid = dagster_pd.dataframe_solid(
name='sum_sq', input=[dagster_pd.depends_on(sum_solid)], transform_fn=sum_sq_transform_fn
)
def always_fails_transform_fn(*_args, **_kwargs):
raise Exception('I am a programmer and I make error')
always_fails_solid = dagster_pd.dataframe_solid(
name='always_fails',
input=[dagster_pd.depends_on(sum_solid)],
transform_fn=always_fails_transform_fn
)
return dagster.core.pipeline(name='pandas_hello_world', solids=[sum_solid, sum_sq_solid])
Env
environment:
inputs:
# either name dependencie name or name of output solid
- input_name: num
# Source defines how the input is going to be read into
source: CSV
args:
path: "pandas_hello_world/num.csv"
- input_name: sum
# No CSVORPARQUET source cause the actual sournce type is only defined in env
source: CSV
args:
path: "sum.csv"
format: 'CSV'
This allows changing the source type
environment:
inputs:
# either name dependencie name or name of output solid
- input_name: num
# Source defines how the input is going to be read into
source: SQL
args:
sql_query: "SELECT * FROM NUM"
- input_name: sum
# No CSVORPARQUET source cause the actual sournce type is only defined in env
source: CSV
args:
path: "sum.csv"
format: 'CSV'
from dagster.
This keeps the ability to have some expectations of what the source should return, while not binding pipelines to concrete inputs.
from dagster.
def process_raw():
# Some code to get raw binary data from file and return dataframe
pass
# We can still typecheck inside the solid that output is valid
transforming_solid = dagster_pd.dataframe_solid(
inputs=[
dagster.raw_input('num')
],
name='preprocessing'
transform_fn=process_raw
)
from dagster.
Ok, I'll just do it in even shorter way.
Currently you need to specify input=[dagster_pd.csv_input('num_csv')]
. This binds pipeline and solid to concrete input_source.
In addition, when you specify input=[dagster_pd.depends_on(solid)]
, it gets weird CSVORPARQUET
input source type, exactly because input has some opinions on what it's sources should be.
I propose that you can use any input source for any input, without specifying it from pipeline, as long as the result of that source is compatible. So the first thing becomes input=[dagster_pd.dataframe_input(name='num')]
. Then you can specify CSV source for it. Or SQL source. Or parquet source. As long as all those sources return dataframe.
For raw data (like getting json file), we use dagster.raw_input
or eg dagster.json_input
. Again, actual source is specified in env.
from dagster.
from dagster.
Related Issues (20)
- Stale materialization status for static partition after run deletion HOT 2
- Increased usage of RAM for jobs with subprocess
- GCSComputeLogManager no working when deploying in K8S
- Replace deprecated utcfromtimestamp method
- DynamicOut downstream op using PolarsParquetIOManager fails with ComputeError: RecursionError HOT 5
- Allow creating an "email asset owners" alert even if not all assets have owners
- Steps not getting terminated on run termination HOT 1
- Partitions materialized by runs with failures in other steps are shown as missing HOT 1
- Negative missing partitions count
- Allow tzformat to tag along time window date range
- New navigation is regression over old one (can't filter everything to one code location) HOT 1
- Really slow Embedded ELT (Sling) incremental update
- Op Tags missing from deployment page HOT 3
- 1.6.12 introduced a bug preventing asset checks to be available in a code location using dagster-dbt HOT 3
- DagsterInvalidSubsetError for dbt cloud assets when defining an asset job with them HOT 1
- run_failure_sensor and run_status_sensor for DagsterRunStatus.FAILURE not working on 1.7.3 HOT 3
- InputManager works with @op but not with @asset
- Many assets as inputs makes lineage difficult to follow HOT 2
- better error message in asset CLI when trying to materialize partitioned asset without specifying partition HOT 1
- enable materializing ranges of partitions using the asset CLI
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dagster.