openlineage / openlineage Goto Github PK
View Code? Open in Web Editor NEWAn Open Standard for lineage metadata collection
Home Page: http://openlineage.io
License: Apache License 2.0
An Open Standard for lineage metadata collection
Home Page: http://openlineage.io
License: Apache License 2.0
Need support for S3 operator and extractor in marquez_airflow to run a DAG.
Purpose:
A lot of data projects are written in Python.
Having standard Python client would both implementation of OpenLineage from both sides - of
event producers, who don't need to write their own clients, and from consumers, who can
use it for tests. Another benefit of having standard client is increased conformance to schema.
Proposed implementation
In future, the best way to keep clients conformant to the spec would be to autogenerate them.
In scope of this proposal would be to add simple handwritten client using requests library.
Each leaf in the logical plan is an input and the root is the output.
When a Leaf is not understood we should track an "UnknownInput" facet with as much details as we can extract at runtime about what this input actually is.
The same applies to unknown actions/command for the output (root of the plan).
Purpose:
We currently have JobFacets for everything that does not change every run (example: the current version in source control like the git sha). We have RunFacets for everything that changes every Run (example: the schedule time).
We are missing the same notion for input and output datasets.
Currently we only have Dataset facets for things that don't change every run (example: Schema). I propose to add Input and Output facets to capture things that change in datasets every run (ex: number of row produced, etc)
Proposed implementation
Add InputFacets and OutputFacets in the core model. Directly in the Input or Output field.
"inputs": {
"description": "The set of **input** datasets.",
"type": "array",
"items": {
"allOf": [
{ "$ref": "#/definitions/Dataset" },
{
"type": "object",
"properties": {
"inputFacets": {
...
}
}
}
]
}
}
Consumers should be able to receive any facets and need a JSONSCHEMA that just defines the core model without any facet
Purpose:
In a data mesh setup an instance of a dataset (output of a particular run id) is created when its upstream input datasets (upstream nodes' outputs on the mesh) become available; this could be per event, or per batch file, etc.
Data mesh assumes that every dataset instance is uniquely identified and addressable via a unique combination of a data node and its output port URL.
The mesh is constructed from a node linking its input ports (input datasets) to its upstream nodes' output ports (output datasets).
There are no global or central DAG definitions.
Also, the upstream nodes can be microservices, and be off the mesh.
I can imagine using open lineage in a particular use case:
for a particular dataset instance, discover its graph of upstream data-nodes:output-ports (dataset instances) that have led to its creation.
Proposed implementation
There are multiple options here:
IIUC the spec, there is an assumption that the combination of Dataset namespace, name, and its facet dataSource uniquely address an instance of a dataset. In which case the traverse function can essentially chain the inputs to upstream outputs and so on.
For example in this case (I may not have followed the exact syntax):
{
...
"run": {
"runId": "345",
},
"job": {
"namespace": "clinical-visits",
"name": "clinical-visits.daily",
},
inputs: [
{
namespace: "members.mesh.com",
name: "members-daily"
facets: {
dataSource: "dm:://members.mesh.com/output-ports/members-daily/snapshots/02-22-2021"
},
...
]
The traverser will look up jobs with this exact output port unique address.
{
...
"run": {
"runId": "123",
},
"job": {
"namespace": "members",
"name": "members.daily",
},
outputs: [
{
namespace: "members.mesh.com",
name: "members-daily"
facets: {
dataSource: "dm:://members.mesh.com/output-ports/members-daily/snapshots/02-22-2021"
},
...
]
The other option would be to expect combination of namespace and name to be a uniquely addressable instance.
I suggest the specification to make an explicit statement about how this use case is expected to be implemented.
There is another questions here:
How the type of an input/output dataset is expected to be known. For example how a particular implementation would recognize this is a data mesh standard address, or simply a database URL, etc.
For example a data-mesh-aware implementation of the OpenLineage can extract useful meta-data context from from a data node's output port.
Maybe need to make the type specific in an address schema or as an independent attribute, to allow selecting a particular "operator" in traverse or other functions.
With the upcoming release of OpenLineage 0.1.0
, we'll want to publish our artifacts via CI. Below, I propose the following release steps:
gradle.properties
for openlineage-java
and in setup.py
for openlineage-python
X.Y.Z
on main
branchNote: The steps outlined above are inspired by the release steps for
Marquez
. Also, for step 1, we can referencenew-version.sh
used on Marquez releases.
@mobuchowski: You've already made some great additions to our CI config, this is just to capture the CI release flow.
Purpose (Why?):
Job Schedulers often have their own state machine that we'd want to capture. For example to understand events that might trigger the job or not and why. (ex: some input data became available but not all dependencies are resolved yet)
Proposed implementation (How?):
Add a StateTransitionRunFacet
, capturing the new state the run is in as well as the previous state and the event that triggered this change. Note that the new State might be the Same as the previous State.
I would follow the terminology defined here: https://en.wikipedia.org/wiki/UML_state_machine
Example:
StateTransition
run facet
stateTransition: {
previousState: "WAITING",
event: "INPUT_AVAILABLE",
newState: "RUNNING"
}
Initial questions:
guard
here (See: https://en.wikipedia.org/wiki/UML_state_machine#Guard_conditions)In the openlineage-airflow doc we should have a section on how to configure Marquez on Astronomer.
https://www.astronomer.io/docs/cloud/stable/deploy/configure-deployment#set-environment-variables
https://github.com/OpenLineage/OpenLineage/tree/main/integration/airflow#configuration
Given the Snowflake query id, the explain plan can be retrieved with the following query:
(using the query id 019bbdfa-0400-571a-0000-00301d038151
as an example here)
select system$explain_plan_json('019bbdfa-0400-571a-0000-00301d038151') as explain_plan;
This returns a json representation of the plan that should go in a Snowflake facet
Here is an example of a plan:
{
"GlobalStats":{
"partitionsTotal":4,
"partitionsAssigned":4,
"bytesAssigned":2042880
},
"Operations":[
[
{
"id":0,
"operation":"Result",
"expressions":[
"number of rows inserted"
]
},
{
"id":1,
"parent":0,
"operation":"Insert",
"objects":[
"DEMO_DB.PUBLIC.FOO"
]
},
{
"id":2,
"parent":1,
"operation":"SortWithLimit",
"expressions":[
"sortKey: [SUM(LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)) DESC NULLS FIRST]",
"rowCount: 20"
]
},
{
"id":3,
"parent":2,
"operation":"InnerJoin",
"expressions":[
"joinKey: (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY)"
]
},
{
"id":4,
"parent":3,
"operation":"InnerJoin",
"expressions":[
"joinKey: (NATION.N_NATIONKEY = CUSTOMER.C_NATIONKEY)"
]
},
{
"id":5,
"parent":4,
"operation":"TableScan",
"objects":[
"SNOWFLAKE_SAMPLE_DATA.TPCH_SF001.NATION"
],
"expressions":[
"N_NATIONKEY",
"N_NAME"
],
"partitionsAssigned":1,
"partitionsTotal":1,
"bytesAssigned":2048
},
{
"id":6,
"parent":4,
"operation":"InnerJoin",
"expressions":[
"joinKey: (ORDERS.O_CUSTKEY = CUSTOMER.C_CUSTKEY)"
]
},
{
"id":7,
"parent":6,
"operation":"Filter",
"expressions":[
"(ORDERS.O_ORDERDATE >= '1993-10-01') AND (ORDERS.O_ORDERDATE < '1993-11-01')"
]
},
{
"id":8,
"parent":7,
"operation":"TableScan",
"objects":[
"SNOWFLAKE_SAMPLE_DATA.TPCH_SF001.ORDERS"
],
"expressions":[
"O_ORDERKEY",
"O_CUSTKEY",
"O_ORDERDATE"
],
"partitionsAssigned":1,
"partitionsTotal":1,
"bytesAssigned":407552
},
{
"id":9,
"parent":6,
"operation":"JoinFilter",
"expressions":[
"joinKey: (NATION.N_NATIONKEY = CUSTOMER.C_NATIONKEY)"
]
},
{
"id":10,
"parent":9,
"operation":"TableScan",
"objects":[
"SNOWFLAKE_SAMPLE_DATA.TPCH_SF001.CUSTOMER"
],
"expressions":[
"C_CUSTKEY",
"C_NAME",
"C_ADDRESS",
"C_NATIONKEY",
"C_PHONE",
"C_ACCTBAL",
"C_COMMENT"
],
"partitionsAssigned":1,
"partitionsTotal":1,
"bytesAssigned":109568
},
{
"id":11,
"parent":3,
"operation":"Filter",
"expressions":[
"LINEITEM.L_RETURNFLAG = 'R'"
]
},
{
"id":12,
"parent":11,
"operation":"JoinFilter",
"expressions":[
"joinKey: (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY)"
]
},
{
"id":13,
"parent":12,
"operation":"TableScan",
"objects":[
"SNOWFLAKE_SAMPLE_DATA.TPCH_SF001.LINEITEM"
],
"expressions":[
"L_ORDERKEY",
"L_EXTENDEDPRICE",
"L_DISCOUNT",
"L_RETURNFLAG"
],
"partitionsAssigned":1,
"partitionsTotal":1,
"bytesAssigned":1523712
}
]
]
}
For the following query:
insert into DEMO_DB.PUBLIC.foo (c_custkey, c_name, revenue, c_acctbal,
n_name,
c_address,
c_phone,
c_comment) SELECT
c_custkey,
c_name,
sum(l_extendedprice * (1 - l_discount)) as revenue,
c_acctbal,
n_name,
c_address,
c_phone,
c_comment
FROM
customer,
orders,
lineitem,
nation
WHERE
c_custkey = o_custkey
AND l_orderkey = o_orderkey
AND o_orderdate >= date '1993-10-01'
AND o_orderdate < date '1993-11-01'
AND l_returnflag = 'R'
AND c_nationkey = n_nationkey
GROUP BY
c_custkey,
c_name,
c_acctbal,
c_phone,
n_name,
c_address,
c_comment
ORDER BY
revenue desc
LIMIT 20;
note that the nodes "operation":"TableScan"
have the input table in their objects
field.
and in this case the node "operation":"Insert"
has the output in the objects
filed.
Currently the openAPI spec is published here: https://github.com/OpenLineage/OpenLineage.github.io/tree/main/openapi
The javadoc for the java client here: https://github.com/OpenLineage/OpenLineage.github.io/tree/main/javadoc
We need to hook this in CI to publish those automatically when the main branch is updated.
To expose information like the version of the job and its source repository, Marquez needs a mechanism to provide metadata in the job package when deploying to a scheduler.
An example of this is deploying DAGs to an Airflow instance, but this is applicable to all package deployments in general.
In some instances workflows/jobs are directly pulled from a source control system from the scheduler allowing some metadata to be inferred from the environment. In other instances, the jobs are packaged and deployed to a scheduler (example: google cloud composer or astronomer). This doc concerns the later case of providing metadata related to the source code of the job at build time.
We define a metadata file to be added at the root of the package/deployment location. This file is named .OpenLineage.job.facets.json and is a json file with the following initial properties (that can be expanded later).
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"type": "object",
"properties": {
"sourceCodeLocation": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"url": {
"type": "string"
},
"path": {
"type": "string"
},
"version": {
"type": "string"
},
"branch": {
"type": "string"
}
},
"required": [
"type",
"url",
"path",
"version",
"branch"
]
}
},
"required": [
"sourceCodeLocation"
]
}
For the DAG defined at: https://github.com/MarquezProject/marquez-airflow-quickstart/blob/693e35482bc2e526ced2b5f9f76ef83dec6ec691/dags/dummy_example.py
{
"sourceCodeLocation": {
"type": "GIT",
"url: "https://github.com/MarquezProject/marquez-airflow-quickstart.git",
"path": "dags",
"version": "693e35482bc2e526ced2b5f9f76ef83dec6ec691",
"branch": "master"
}
}
The SparkListener approach was already implemented in the Spark integration prior to the code being moved from Marquez to OpenLineage. Much of the context here is obsolete. However, the extensibility issue still needs to be addressed.
The Spark integration relies on instrumentation achieved by adding a jar to the javaagent
argument on the command line when starting a Spark job. When the JVM starts up, the application ClassLoader
loads both the instrumentation jar as well as the Spark, Hadoop, and other dependency jars that are on the classpath. When the SparkContext
is loaded, Spark parses its own configuration and finds additional jars that have been specified, either on the command line or in a configuration map that can be populated at runtime. The jars are (sometimes) downloaded and added to a MutableURLClassLoader
that is owned and managed by the SparkContext
.
Since the Marquez integration is loaded by the top level application ClassLoader
, and Java’s ClassLoaders
are ignorant of their child ClassLoaders
, the integration is only able to find classes that are loaded by that top level ClassLoader
. BigQuery and other dependencies may be specified in the spark.jars
configuration, so that they are loaded by that child MutableURLClassLoader
, hence invisible to the Marquez agent classes. While it is possible from the integration to detect that BigQuery classes are available, attempting to instantiate the classes that actually deal with those classes cause NoClassDefFoundErrors
, since the ClassLoader
can’t find the classes loaded by the child loader.
The current workaround relies on instrumentation to invoke methods from the parent ClassLoader
without needing to directly access the BigQueryRelation
class or related classes.
I think the instrumentation approach has the following drawbacks
To be fair, it has the following advantages
if ( class.hasMethod("X")) generateXSpecificCode()
)Reflection has been suggested as an alternative strategy, but I think the pros/cons are largely the same as the instrumentation approach- it uses indirect method invocation to circumvent static type checking.
I propose a more extensible method of loading and discovering node visitors. The basic strategy is
PartitalFunction<LogicalPlan, List<Dataset>>
interface we already usespark.jars.packages
or spark.jars
configuration propertiesThe main advantages I see here are
BigQueryRelation
and the HBaseRelation
.The tradeoff here is that users need to declare the MarquezOpenLineage integration dependencies and deploy them separately from the main jar. Statically typed dataset integrations should be loaded by the Spark ClassLoader
(e.g., declared in the spark.jars.packages
or spark.jars
configuration) in order to link to the Datasource classes they depend on. In terms of developer experience, I don't think this pattern is foreign to Spark users today.
Notably, nothing prohibits users from bundling their own superjars that contain all of their dependencies and deploying them to their Spark clusters. Similarly, nothing prevents integration authors from using reflection in their code. Users are given the freedom to extend and package their integrations in a way that suits them without enforcing an opaque, black-box model.
As a separate issue, we can consider changing the MarquezOpenLineage agent to be loaded by the spark.extraListeners
configuration. This would allow users to reference all of the MarquezOpenLineage integration dependencies by specifying the listener and the node visitors in spark.jars.packages
so that all of the classes are loaded by the Spark ClassLoader
. That eliminates any confusion about which components need to be loaded by which ClassLoader
. It also simplifies testing and could enable integration in serverless environments, such as AWS Glue, where users don't have access to modify the JVM arguments.
This has already been implemented
Spark integration packages need to create a file in META-INF/services/marquez.spark.agent.lifecycle.plan.DatasetSource
which declares a list of classes that implement the openlineage.spark.agent.lifecycle.plan.DatasetSource
interface. During startup, the main MarquezOpenLineage agent uses the JVM ServiceLoader
to find all service implementations on the classpath (we need to specifically check the Spark URLClassloader
to find implementations loaded by Spark at runtime). Combining jars requires concatenating the contents of the META-INF/services/marquez.spark.agent.lifecycle.plan.DatasetSource
files so that all implementations are listed.
Currently, users deploy the Marquez integration by adding the javaagent:<jar>=http://marquez_url/path/to/job
argument to their spark-submit args. That doesn't change with this proposal.
What does change is that users would be required to specify the integrations they require in their spark.jars
or spark.jars.packages
configuration. As an example, the airflow example in the README would look like this:
from uuid import uuid4
import os
...
job_name = 'job_name'
properties = {
'spark.extraListeners': 'openlineage.spark.agent.OpenLineageSparkListener',
'spark.jar.packages': 'io.openlineage:openlineage-spark:0.0.1',
'spark.openlineage.host': os.environ.get('MARQUEZ_URL'),
'spark.openlineage.namespace': os.getenv('MARQUEZ_NAMESPACE', 'default'),
'spark.openlineage.apiKey': os.environ.get('MARQUEZ_API_KEY'),
'spark.openlineage.parentRunId': "{{task_run_id(run_id, task)}}",
'spark.openlineage.parentJobName': '{{ task.task_id }}'
}
t1 = DataProcPySparkOperator(
task_id=job_name,
gcp_conn_id='google_cloud_default',
project_id='project_id',
cluster_name='cluster-name',
region='us-west1',
main='gs://bucket/your-prog.py',
job_name=job_name,
dataproc_pyspark_properties=properties,
#Both bigquery relation and bigquery openlineage integration declared here
dataproc_pyspark_jars='gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar,gs://my_bucket/path/to/bigquery-integration.jar',
dag=dag)
Alternatively, a cluster can be created by loading the correct jars into the Spark jars directory, as described in the BigQuery connector docs. In the same way, the MarquezOpenLineage integrations would need to be installed on the driver node. In either case, the person responsible for loading the bigquery jars on the classpath is also responsible for loading the Marquez integration. Dependencies can be bundled so that the Marquez integration and the BigQueryRelation class are all loaded in the same jar.
This is now replaced by the OpenLineage API
Purpose:
Many databases have explicit versioning in place for datasets. It is highly desirable to track the specific version of the dataset if it is known at consumption/generation time. Rather than relying on the service to correctly tie job inputs to specific versions of a dataset, the client should be allowed to report that information if it is available.
Proposed implementation
A dataset version facet may be sufficient to report dataset version information, if it exists. Custom facets can be used to include any database specific version information that may exist, while the minimum required fields should include a versionName string field.
Purpose:
We need to define how the OpenLineage spec gets versioned and published.
Some requirements:
Proposed implementation
“$id”
field:
"$id": "https://openlineage.io/spec/1-0-0/OpenLineage.json"
** Implementation plan: **
The discussion is available on a google doc:
https://docs.google.com/document/d/1inhmb90SB62VyYf8nkkpjBDPpyr2wxkIXQ_AFIQiF9I/edit
The Marquez community is excited to announce that the following integrations will be moved to the wider OpenLineage ecosystem:
marquez-airflow
: A library that integrates Airflow DAGs to emit OpenLineage metadata.marquez-spark
: A spark agent using jvm instrumentation to emit OpenLineage metadata.marquez-integration-common
: Shared code across integrations.marquez-dbt-snowflake
: A library that integrates with DBT to emit OpenLineage metadata for Snowflake.marquez-dbt-bigquery
: A library that integrates with DBT to emit OpenLineage metadata for BigQuery.Note: You can view the integrations in the Marquez repo under
marquez/integrations/
. Marquez is an official reference implementation of the OpenLineage API and all the integrations adopt the OpenLineage standard.
The OpenLineage standard was influenced by Marquez's underlying data model (datasets, jobs, runs) while also drawing inspiration from the projects lineage ingestion API. With OpenLineage now a standalone project under LFAI & Data Foundation, we've decided that now would be the right time to move the integrations over as more and more integrations are being added to the projects roadmap (Looker, Flink, Beam, etc). This would also allow for the OpenLineage community to grow and learn from existing integration implementations, prioritize bug fixes, propose new integrations and changes to the spec, etc.
Below we outline a high-level timeline of moving the integrations to OpenLineage. We encourage everyone to provide feedback and express any concerns you may have by commenting in this issue:
0.16.0
Then, going forward, new integrations and improvements will be made by open pull requests against the OpenLineage repo. Note that during this migration phase, existing Marquez specific functionality and environment variables will be maintained to ensure backwards compatibility. The main difference is that integration packages will get a new name: openlineage-*
instead of marquez-*
.
Marquez doesn't collect dataset metadata for BigQueryOperator in Airflow. To collect BigQuery metadata, I tried the Airflow example from the repo, but instead of PostgresOperator
, I used BigQueryOperator
. So, for example, I modified operators in counter.py
as:
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
t1 = BigQueryOperator(
task_id='if_not_exists',
bigquery_conn_id=BQ_CONN_ID,
sql='''
CREATE TABLE IF NOT EXISTS `my-project.my_dataset.counts` (
value INTEGER
);''',
use_legacy_sql=False,
dag=dag
)
t2 = BigQueryOperator(
task_id='inc',
bigquery_conn_id=BQ_CONN_ID,
sql='''
INSERT INTO `my-project.my_dataset.counts` (value)
VALUES ({{params.value}})
''',
params={
"value": random.randint(1, 10)
},
use_legacy_sql=False,
dag=dag
)
The dags ran successfully in Airflow, the counts and sums tables have been created on GCP, and in Marquez UI, metadata for jobs can be seen, but not for datasets.
We need a spec similar to the dataset naming strategy for jobs
The py
client currently allows for setting the openlineage producer via an env var: OPENLINEAGE_PRODUCER
. We'll want allow users to configure the producer via the client constructor.
Purpose:
OpenAPI and JsonSchema are mostly the same model with a goal of OpenAPI to be a strict superset of JSONSchema in the near future.
The goal is to make OpenLineage independent from the underlying protocol. OpenAPI assumes a REST endpoint which is not our intent here. This would still make possible to define an OpenLineage HTTP endpoint while decoupling the model and make it more obvious that it can be used with Kafka for example.
Proposed implementation
Convert the OpenAPI spec to a json schema.
Right now, we hook to airflow by subclassing DAG and asking users to use our subclass instead of airflow.models.DAG
.
Explore hooking our changes to DAG via plugin mechanism: https://airflow.apache.org/docs/apache-airflow/stable/plugins.html
Discovered when running tests, when a DataFrame is converted to an RDD (such as when writing to a JDBC relation), the marquez listener is double-triggered- once for the DataFrame operation and again for the RDD operation. Fortunately, the original DataFrame LogicalPlan contains the entire input/output graph, so no information is lost. However, the second set of events does post a new job with no inputs or outputs to the Marquez endpoint.
Example of two posted events- one from the DataFrame and one from the RDD:
Publishing sql event LineageEvent(eventType=COMPLETE, eventTime=2021-01-01T00:00Z, run=LineageEvent.Run(runId=39f36ea5-bd33-4ff5-9a64-00fa2f439d21, facets=LineageEvent.RunFacet(nominalTime=null, parent=LineageEvent.ParentRunFacet(run=LineageEvent.RunLink(runId=ab478a6e-0226-46bb-b5c3-2b23179a1277), job=LineageEvent.JobLink(namespace=Namespace, name=ParentJob)), additional={spark.logicalPlan=LogicalPlanFacet(plan=[{"class":"org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand","num-children":0,"query":[{"class":"org.apache.spark.sql.catalyst.plans.logical.Filter","num-children":1,"condition":[{"class":"org.apache.spark.sql.catalyst.expressions.GreaterThan","num-children":2,"left":0,"right":1},{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"age","dataType":"long","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":6,"jvmId":"2bb20251-cb0e-4586-a2b0-ae90290f14e4"},"qualifier":[]},{"class":"org.apache.spark.sql.catalyst.expressions.Cast","num-children":1,"child":0,"dataType":"long","timeZoneId":"America/Los_Angeles"},{"class":"org.apache.spark.sql.catalyst.expressions.Literal","num-children":0,"value":"16","dataType":"integer"}],"child":0},{"class":"org.apache.spark.sql.execution.datasources.LogicalRelation","num-children":0,"relation":null,"output":[[{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"age","dataType":"long","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":6,"jvmId":"2bb20251-cb0e-4586-a2b0-ae90290f14e4"},"qualifier":[]}],[{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"name","dataType":"string","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":7,"jvmId":"2bb20251-cb0e-4586-a2b0-ae90290f14e4"},"qualifier":[]}]],"isStreaming":false}],"dataSource":null,"options":null,"mode":null}])})), job=LineageEvent.Job(namespace=Namespace, name=word_count.execute_save_into_data_source_command, facets=null), inputs=[LineageEvent.Dataset(namespace=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database, name=data_table, facets=LineageEvent.DatasetFacet(documentation=null, schema=LineageEvent.SchemaDatasetFacet(fields=[LineageEvent.SchemaField(name=age, type=decimal(20,0), description=null), LineageEvent.SchemaField(name=name, type=string, description=null)]), dataSource=LineageEvent.DatasourceDatasetFacet(name=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database, uri=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database), description=null, additional={}))], outputs=[LineageEvent.Dataset(namespace=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database, name=data_table, facets=LineageEvent.DatasetFacet(documentation=null, schema=LineageEvent.SchemaDatasetFacet(fields=[LineageEvent.SchemaField(name=age, type=decimal(20,0), description=null), LineageEvent.SchemaField(name=name, type=string, description=null)]), dataSource=LineageEvent.DatasourceDatasetFacet(name=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database, uri=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database), description=null, additional={}))], producer=https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark)
Publishing RDD event LineageEvent(eventType=COMPLETE, eventTime=2021-01-01T00:00Z, run=LineageEvent.Run(runId=ab478a6e-0226-46bb-b5c3-2b23179a1277, facets=LineageEvent.RunFacet(nominalTime=null, parent=LineageEvent.ParentRunFacet(run=LineageEvent.RunLink(runId=ab478a6e-0226-46bb-b5c3-2b23179a1277), job=LineageEvent.JobLink(namespace=Namespace, name=ParentJob)), additional={})), job=LineageEvent.Job(namespace=Namespace, name=word_count, facets=null), inputs=[], outputs=[], producer=https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark)
Note that the RDD event has no inputs/outputs and has a distinct run id.
Purpose:
Currently, the Run
object has a runId
property that is defined as "The id of the run, unique relative to the job" https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#L61-L63 . This implies a run id may be reused across jobs so that to link a job run to a parent run, the job name and namespaces are required in addition to the run id (see https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#L273-L311 ).
In practice, Marquez generates UUIDs for the job run ids (e.g., see the airflow integration at https://github.com/MarquezProject/marquez/blob/main/integrations/airflow/marquez_airflow/dag.py#L228-L229 ). In the spark integration, there's confusion about whether the passed job run parameters belong to the parent run or to a new job run (see the README at https://github.com/MarquezProject/marquez/blob/main/integrations/spark/README.md and the context at https://github.com/MarquezProject/marquez/blob/main/integrations/spark/src/main/java/marquez/spark/agent/MarquezContext.java#L29-L31 ).
Given the implementation already uses UUIDs for job runs, I think the requirement to pass the additional job information is a bit burdensome and redundant. It also leaves room for confusion in implementation projects, which would be nice to eliminate.
Proposed implementation
I propose requiring job run ids be globally unique (at least within a given OpenLineage backend) and remove the job
parameter from the ParentRunFacet
. Integrations should pass around these globally unique runIds to link job runs to parents. This is in line with the existing Marquez model (see https://github.com/MarquezProject/marquez/blob/main/clients/python/marquez_client/client.py#L325-L327 ) and will reduce confusion in integration implementations, such as the spark integration project.
Add lineage coverage for the redshift <-> s3 operators
https://github.com/apache/airflow/blob/v1-10-stable/airflow/operators/redshift_to_s3_operator.py
https://github.com/apache/airflow/blob/v1-10-stable/airflow/operators/s3_to_redshift_operator.py
Please reffer to the OpenLineage naming spec:
https://github.com/OpenLineage/OpenLineage/pull/31/files
The goal is to have a Backend abstraction:
java: https://github.com/MarquezProject/marquez/blob/main/clients/java/src/main/java/marquez/client/Backend.java
python: https://github.com/MarquezProject/marquez/blob/main/clients/python/marquez_client/backend.py
that can be configured to send HTTP posts or Kafka events
Purpose:
Main motivation of this proposal is making sure that custom facets in different implementations of OpenLineage clients for the same data source have the same name. Another advantage will be decreased chance of different facets name colliding.
Proposed implementation
I propose that custom facets should follow name pattern {prefix}{name}{entity}Facet
PascalCased, where
Important aspect for OpenLineage consumers is standarizing JSON key naming.
I propose to follow pattern {prefix}_{name}
snakeCased.
Example of custom facet following both patterns: BigQueryStatisticsJobFacet
and bigQuery_statistics
Proposal
IIUC, the specification expects - by default - each Dataset to encode its schema inline in the spec, through SchemaDatasetFacet. I suggest allowing to either encode schema inline or alternatively use an external schema file url according the scheme language of choice of the underlying system.
For example in a data mesh implementation, each data node - uniquely addressable - provides a schema endpoint (URL) for its output datasets. The schema language can follow a different standard based on http://json-schema.org/ that encapsulates the semantic and syntax of the datasets.
The schema URL associated with the data mesh's node is versioned with the data, and localized with the dataset itself.
The assumption of encoding the schema file according to the lineage format - e.g. encoding recognized fields such as types and leaving out unrecognized fields such as range, valid values, etc. - requires writing adapters and extending the implementation of the underlying serializer.
However leaving the schema to be simply a URL to whatever schema format/file/location the underlying mesh or lake uses does not require such an extension - until there is a particular functionality of the implementation that really needs to parse the schema, which I don't think is a first class concern. I could be wrong.
Proposed Implementation
Make external url a first class member of SchemaDatasetFacet, and make an inline schema definition optional.
From LFAI&Data:
We typically recommend that projects include a short license notice in each project source code file, wherever reasonably possible (e.g., don't worry about this for image files, formats without comments like JSON, etc).
Adding these notices has several benefits, including improving code reuse in other projects by making sure that the license info travels along with the source code file if it is used elsewhere. They can be easily added using a one-line comment with an SPDX short-form identifier, in the following format (adjusting for the file's particular format for comments):
# SPDX-License-Identifier: Apache-2.0
More information on this format is available at https://spdx.dev/ids. It looks like several of the documentation files have these already (correctly with the documentation's CC-BY-4.0 license), so I'm noting this primarily for source code files.
I'm proposing to use JSONSchema to formalize the core OpenLineage model.
JSON has the advantage of making it easier to decentralize schema definition.
We can use that spec to generate a binary format like avro and protobuf.
Airflow allow users to define a SLA for a task.
This should be added as an airflow custom facet when available
Purpose:
Consider an event like owner_updated event - we will need to have the same asset as part of both inputs and outputs. This is because, this event takes an existing asset as input -- so this asset is part of input assets. Also, this event updates the metadata of the asset. In this sense, the event outputs a new state of the asset. So this asset is also part of output assets.
We should discuss how best to go about representing such events in openlineage format.
If i have a job scheduled to push a partition to a table each hour.
And a job running daily consuming the 24 last partitions created by the previous job.
Would marquez be able to modelize this dependency ?
I don't see a way to use multiple datasetVersion as input in a request.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.