Coder Social home page Coder Social logo

openlineage / openlineage Goto Github PK

View Code? Open in Web Editor NEW
1.6K 43.0 267.0 11.38 MB

An Open Standard for lineage metadata collection

Home Page: http://openlineage.io

License: Apache License 2.0

Java 65.62% Python 24.60% Shell 1.56% Dockerfile 0.14% PowerShell 0.02% Rust 4.98% Groovy 0.34% Smarty 0.06% PLpgSQL 1.18% Ruby 0.36% Scala 0.03% Kotlin 0.79% Jinja 0.27% Makefile 0.02% Batchfile 0.02%

openlineage's People

Contributors

ashulmanwework avatar collado-mike avatar d-m-h avatar denimalpaca avatar dependabot[bot] avatar dolfinus avatar fm100 avatar gaborbernat avatar harels avatar henneberger avatar howardyoo avatar huangzhenqiu avatar jdardagran avatar julienledem avatar kacpermuda avatar mattiabertorello avatar merobi-hub avatar mobuchowski avatar mzareba382 avatar nataliezeller1 avatar oleksandrdvornik avatar pawel-big-lebowski avatar renovate[bot] avatar roaraya8 avatar rossturk avatar sekikn avatar sreev avatar tnazarew avatar wjohnson avatar wslulciuc avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

openlineage's Issues

[PROPOSAL] Add standard python client

Purpose:
A lot of data projects are written in Python.
Having standard Python client would both implementation of OpenLineage from both sides - of
event producers, who don't need to write their own clients, and from consumers, who can
use it for tests. Another benefit of having standard client is increased conformance to schema.

Proposed implementation
In future, the best way to keep clients conformant to the spec would be to autogenerate them.
In scope of this proposal would be to add simple handwritten client using requests library.

Add a facet reporting unknown inputs/outputs in marquez-spark

Each leaf in the logical plan is an input and the root is the output.
When a Leaf is not understood we should track an "UnknownInput" facet with as much details as we can extract at runtime about what this input actually is.
The same applies to unknown actions/command for the output (root of the plan).

[PROPOSAL] Add InputFacets and OutputFacets

Purpose:
We currently have JobFacets for everything that does not change every run (example: the current version in source control like the git sha). We have RunFacets for everything that changes every Run (example: the schedule time).
We are missing the same notion for input and output datasets.
Currently we only have Dataset facets for things that don't change every run (example: Schema). I propose to add Input and Output facets to capture things that change in datasets every run (ex: number of row produced, etc)

Proposed implementation
Add InputFacets and OutputFacets in the core model. Directly in the Input or Output field.

"inputs": {
          "description": "The set of **input** datasets.",
          "type": "array",
          "items": {
            "allOf": [
              { "$ref": "#/definitions/Dataset" },
              {
                "type": "object",
                "properties": {
                  "inputFacets": {
                    ...
                  }
                }
              }
            ]
          }
        }

Add a Consumer spec

Consumers should be able to receive any facets and need a JSONSCHEMA that just defines the core model without any facet

[PROPOSAL] Enable ability to traverse a graph of upstream dataset instances

Purpose:

In a data mesh setup an instance of a dataset (output of a particular run id) is created when its upstream input datasets (upstream nodes' outputs on the mesh) become available; this could be per event, or per batch file, etc.
Data mesh assumes that every dataset instance is uniquely identified and addressable via a unique combination of a data node and its output port URL.
The mesh is constructed from a node linking its input ports (input datasets) to its upstream nodes' output ports (output datasets).
There are no global or central DAG definitions.
Also, the upstream nodes can be microservices, and be off the mesh.

I can imagine using open lineage in a particular use case:
for a particular dataset instance, discover its graph of upstream data-nodes:output-ports (dataset instances) that have led to its creation.

Proposed implementation

There are multiple options here:
IIUC the spec, there is an assumption that the combination of Dataset namespace, name, and its facet dataSource uniquely address an instance of a dataset. In which case the traverse function can essentially chain the inputs to upstream outputs and so on.

For example in this case (I may not have followed the exact syntax):

{
   ...
  "run": {
    "runId": "345",
  },
  "job": {
    "namespace": "clinical-visits",
    "name": "clinical-visits.daily",
  },

  inputs: [
  {
     namespace: "members.mesh.com",
     name: "members-daily"
     facets: {
       dataSource: "dm:://members.mesh.com/output-ports/members-daily/snapshots/02-22-2021"
  },
  ...
]

The traverser will look up jobs with this exact output port unique address.

{
  ...
 "run": {
   "runId": "123",
 },
 "job": {
   "namespace": "members",
   "name": "members.daily",
 },

 outputs: [
 {
    namespace: "members.mesh.com",
    name: "members-daily"
    facets: {
      dataSource: "dm:://members.mesh.com/output-ports/members-daily/snapshots/02-22-2021"
 },
...
]

The other option would be to expect combination of namespace and name to be a uniquely addressable instance.

I suggest the specification to make an explicit statement about how this use case is expected to be implemented.

There is another questions here:
How the type of an input/output dataset is expected to be known. For example how a particular implementation would recognize this is a data mesh standard address, or simply a database URL, etc.
For example a data-mesh-aware implementation of the OpenLineage can extract useful meta-data context from from a data node's output port.
Maybe need to make the type specific in an address schema or as an independent attribute, to allow selecting a particular "operator" in traverse or other functions.

Releasing via CI

With the upcoming release of OpenLineage 0.1.0, we'll want to publish our artifacts via CI. Below, I propose the following release steps:

  1. Bump the version in gradle.properties for openlineage-java and in setup.py for openlineage-python
  2. Tag the release with X.Y.Z on main branch
  3. Prepare the next development version
  4. CI kickoffs release jobs to publish artifacts

Note: The steps outlined above are inspired by the release steps for Marquez. Also, for step 1, we can reference new-version.sh used on Marquez releases.

@mobuchowski: You've already made some great additions to our CI config, this is just to capture the CI release flow.

[PROPOSAL] track custom state and transitions

Purpose (Why?):
Job Schedulers often have their own state machine that we'd want to capture. For example to understand events that might trigger the job or not and why. (ex: some input data became available but not all dependencies are resolved yet)

Proposed implementation (How?):
Add a StateTransitionRunFacet, capturing the new state the run is in as well as the previous state and the event that triggered this change. Note that the new State might be the Same as the previous State.
I would follow the terminology defined here: https://en.wikipedia.org/wiki/UML_state_machine

Example:
StateTransition run facet

stateTransition: {
  previousState: "WAITING",
  event: "INPUT_AVAILABLE",
  newState: "RUNNING"
}

Initial questions:

[INTEGRATION][Airflow] Collect the query plan in Snowflake integration

Given the Snowflake query id, the explain plan can be retrieved with the following query:
(using the query id 019bbdfa-0400-571a-0000-00301d038151 as an example here)

select system$explain_plan_json('019bbdfa-0400-571a-0000-00301d038151') as explain_plan;

This returns a json representation of the plan that should go in a Snowflake facet

Here is an example of a plan:

{
  "GlobalStats":{
    "partitionsTotal":4,
    "partitionsAssigned":4,
    "bytesAssigned":2042880
  },
  "Operations":[
    [
      {
        "id":0,
        "operation":"Result",
        "expressions":[
          "number of rows inserted"
        ]
      },
      {
        "id":1,
        "parent":0,
        "operation":"Insert",
        "objects":[
          "DEMO_DB.PUBLIC.FOO"
        ]
      },
      {
        "id":2,
        "parent":1,
        "operation":"SortWithLimit",
        "expressions":[
          "sortKey: [SUM(LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)) DESC NULLS FIRST]",
          "rowCount: 20"
        ]
      },
      {
        "id":3,
        "parent":2,
        "operation":"InnerJoin",
        "expressions":[
          "joinKey: (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY)"
        ]
      },
      {
        "id":4,
        "parent":3,
        "operation":"InnerJoin",
        "expressions":[
          "joinKey: (NATION.N_NATIONKEY = CUSTOMER.C_NATIONKEY)"
        ]
      },
      {
        "id":5,
        "parent":4,
        "operation":"TableScan",
        "objects":[
          "SNOWFLAKE_SAMPLE_DATA.TPCH_SF001.NATION"
        ],
        "expressions":[
          "N_NATIONKEY",
          "N_NAME"
        ],
        "partitionsAssigned":1,
        "partitionsTotal":1,
        "bytesAssigned":2048
      },
      {
        "id":6,
        "parent":4,
        "operation":"InnerJoin",
        "expressions":[
          "joinKey: (ORDERS.O_CUSTKEY = CUSTOMER.C_CUSTKEY)"
        ]
      },
      {
        "id":7,
        "parent":6,
        "operation":"Filter",
        "expressions":[
          "(ORDERS.O_ORDERDATE >= '1993-10-01') AND (ORDERS.O_ORDERDATE < '1993-11-01')"
        ]
      },
      {
        "id":8,
        "parent":7,
        "operation":"TableScan",
        "objects":[
          "SNOWFLAKE_SAMPLE_DATA.TPCH_SF001.ORDERS"
        ],
        "expressions":[
          "O_ORDERKEY",
          "O_CUSTKEY",
          "O_ORDERDATE"
        ],
        "partitionsAssigned":1,
        "partitionsTotal":1,
        "bytesAssigned":407552
      },
      {
        "id":9,
        "parent":6,
        "operation":"JoinFilter",
        "expressions":[
          "joinKey: (NATION.N_NATIONKEY = CUSTOMER.C_NATIONKEY)"
        ]
      },
      {
        "id":10,
        "parent":9,
        "operation":"TableScan",
        "objects":[
          "SNOWFLAKE_SAMPLE_DATA.TPCH_SF001.CUSTOMER"
        ],
        "expressions":[
          "C_CUSTKEY",
          "C_NAME",
          "C_ADDRESS",
          "C_NATIONKEY",
          "C_PHONE",
          "C_ACCTBAL",
          "C_COMMENT"
        ],
        "partitionsAssigned":1,
        "partitionsTotal":1,
        "bytesAssigned":109568
      },
      {
        "id":11,
        "parent":3,
        "operation":"Filter",
        "expressions":[
          "LINEITEM.L_RETURNFLAG = 'R'"
        ]
      },
      {
        "id":12,
        "parent":11,
        "operation":"JoinFilter",
        "expressions":[
          "joinKey: (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY)"
        ]
      },
      {
        "id":13,
        "parent":12,
        "operation":"TableScan",
        "objects":[
          "SNOWFLAKE_SAMPLE_DATA.TPCH_SF001.LINEITEM"
        ],
        "expressions":[
          "L_ORDERKEY",
          "L_EXTENDEDPRICE",
          "L_DISCOUNT",
          "L_RETURNFLAG"
        ],
        "partitionsAssigned":1,
        "partitionsTotal":1,
        "bytesAssigned":1523712
      }
    ]
  ]
}

For the following query:

 insert into DEMO_DB.PUBLIC.foo (c_custkey, c_name, revenue, c_acctbal,
    n_name,
    c_address,
    c_phone,
    c_comment) SELECT
    c_custkey,
    c_name,
    sum(l_extendedprice * (1 - l_discount)) as revenue,
    c_acctbal,
    n_name,
    c_address,
    c_phone,
    c_comment
FROM
    customer,
    orders,
    lineitem,
    nation
WHERE
    c_custkey = o_custkey
    AND l_orderkey = o_orderkey
    AND o_orderdate >= date '1993-10-01'
    AND o_orderdate < date '1993-11-01'
    AND l_returnflag = 'R'
    AND c_nationkey = n_nationkey
GROUP BY
    c_custkey,
    c_name,
    c_acctbal,
    c_phone,
    n_name,
    c_address,
    c_comment
ORDER BY
    revenue desc
LIMIT 20;

note that the nodes "operation":"TableScan" have the input table in their objects field.
and in this case the node "operation":"Insert" has the output in the objects filed.

package level job metadata

Purpose

To expose information like the version of the job and its source repository, Marquez needs a mechanism to provide metadata in the job package when deploying to a scheduler.
An example of this is deploying DAGs to an Airflow instance, but this is applicable to all package deployments in general.
In some instances workflows/jobs are directly pulled from a source control system from the scheduler allowing some metadata to be inferred from the environment. In other instances, the jobs are packaged and deployed to a scheduler (example: google cloud composer or astronomer). This doc concerns the later case of providing metadata related to the source code of the job at build time.

Proposal

We define a metadata file to be added at the root of the package/deployment location. This file is named .OpenLineage.job.facets.json and is a json file with the following initial properties (that can be expanded later).

Properties:

  • sourceCodeLocation.type: the source control system
    Examples: GIT, SVN, ...
  • sourceCodeLocation.url: the URL to the repository
    Examples:
  • sourceCodeLocation.path: the path in the repo containing the source files
    Example: path/to/my/dags
    It is intended that we can reconstruct a URL from that path pointing to the file defining a job. https://github.com/{org}/{repo}/tree/{verion}/{path}/{file path in the package}
  • sourceCodeLocation.version: the current version deployed (not a branch name, the actual unique version)
    Examples:
    • Git: the git sha
    • Svn: the revision number
  • sourceCodeLocation.tag: optional tag name
    Examples: {tag name}
  • sourceCodeLocation.branch:optional branch name

JSON Schema

{
  "$schema": "https://json-schema.org/draft/2019-09/schema",
  "type": "object",
  "properties": {
    "sourceCodeLocation": {
      "type": "object",
      "properties": {
        "type": {
          "type": "string"
        },
        "url": {
          "type": "string"
        },
        "path": {
          "type": "string"
        },
        "version": {
          "type": "string"
        },
        "branch": {
          "type": "string"
        }
      },
      "required": [
        "type",
        "url",
        "path",
        "version",
        "branch"
      ]
    }
  },
  "required": [
    "sourceCodeLocation"
  ]
}

Example:

For the DAG defined at: https://github.com/MarquezProject/marquez-airflow-quickstart/blob/693e35482bc2e526ced2b5f9f76ef83dec6ec691/dags/dummy_example.py

{ 
  "sourceCodeLocation": {
    "type": "GIT",
    "url: "https://github.com/MarquezProject/marquez-airflow-quickstart.git",
    "path": "dags",
    "version": "693e35482bc2e526ced2b5f9f76ef83dec6ec691",
    "branch": "master"
  }
}

[INTEGRATION] [Spark] Making the spark integration extensible

Context

The SparkListener approach was already implemented in the Spark integration prior to the code being moved from Marquez to OpenLineage. Much of the context here is obsolete. However, the extensibility issue still needs to be addressed.

The Spark integration relies on instrumentation achieved by adding a jar to the javaagent argument on the command line when starting a Spark job. When the JVM starts up, the application ClassLoader loads both the instrumentation jar as well as the Spark, Hadoop, and other dependency jars that are on the classpath. When the SparkContext is loaded, Spark parses its own configuration and finds additional jars that have been specified, either on the command line or in a configuration map that can be populated at runtime. The jars are (sometimes) downloaded and added to a MutableURLClassLoader that is owned and managed by the SparkContext.

Since the Marquez integration is loaded by the top level application ClassLoader, and Java’s ClassLoaders are ignorant of their child ClassLoaders, the integration is only able to find classes that are loaded by that top level ClassLoader. BigQuery and other dependencies may be specified in the spark.jars configuration, so that they are loaded by that child MutableURLClassLoader, hence invisible to the Marquez agent classes. While it is possible from the integration to detect that BigQuery classes are available, attempting to instantiate the classes that actually deal with those classes cause NoClassDefFoundErrors, since the ClassLoader can’t find the classes loaded by the child loader.

The current workaround relies on instrumentation to invoke methods from the parent ClassLoader without needing to directly access the BigQueryRelation class or related classes.

I think the instrumentation approach has the following drawbacks

  1. We lose type safety and the ability to easily unit test the instrumentation code
  2. Failure to instrument the code at runtime leaves users blind to the missing dataset nodes
  3. Isn't extensible to support other integrations

To be fair, it has the following advantages

  1. Only a single jar is needed at runtime
  2. It's possible for a single jar to support multiple, statically non-compatible versions of the BigQuery integration using conditional checks (e.g., if ( class.hasMethod("X")) generateXSpecificCode())

Reflection has been suggested as an alternative strategy, but I think the pros/cons are largely the same as the instrumentation approach- it uses indirect method invocation to circumvent static type checking.

Proposal

I propose a more extensible method of loading and discovering node visitors. The basic strategy is

  1. Create a common interface for visiting logical plan nodes- something that extends the PartitalFunction<LogicalPlan, List<Dataset>> interface we already use
  2. Take advantage of the JVM's ServiceLoader capabilities to enable implementations on the classpath to be found at runtime
  3. Package node visitors in different subprojects that generate their own jars
  4. Entrust Spark users to declare dependencies on the node visitors they need using the spark.jars.packages or spark.jars configuration properties

The main advantages I see here are

  1. Static typing, easy to read code, and easy unit testing
  2. Extensible framework to support new integrations
    This point, I feel, is extremely important. While the single jar deployment is convenient, it creates an unnecessary barrier for users who want to take part in the OpenLineage ecosystem, but can't/won't contribute directly to Marquez or OpenLineage. This may be because
  • their company has strict open source contribution policies
  • they need to work with some proprietary, custom datasource. E.g., a robotics company might have a datasource specific to ROS topics that are particular to that company
  • they want to deploy their integration without going through the community review/release process
    Forcing developers to fork the repo in order to add custom integrations is a deterrent to building an ecosystem.
  1. Adding integrations is less complex, as developers don't need to worry about, e.g., conflicting versions of Guava required by the BigQueryRelation and the HBaseRelation.

The tradeoff here is that users need to declare the MarquezOpenLineage integration dependencies and deploy them separately from the main jar. Statically typed dataset integrations should be loaded by the Spark ClassLoader (e.g., declared in the spark.jars.packages or spark.jars configuration) in order to link to the Datasource classes they depend on. In terms of developer experience, I don't think this pattern is foreign to Spark users today.

Notably, nothing prohibits users from bundling their own superjars that contain all of their dependencies and deploying them to their Spark clusters. Similarly, nothing prevents integration authors from using reflection in their code. Users are given the freedom to extend and package their integrations in a way that suits them without enforcing an opaque, black-box model.

As a separate issue, we can consider changing the MarquezOpenLineage agent to be loaded by the spark.extraListeners configuration. This would allow users to reference all of the MarquezOpenLineage integration dependencies by specifying the listener and the node visitors in spark.jars.packages so that all of the classes are loaded by the Spark ClassLoader. That eliminates any confusion about which components need to be loaded by which ClassLoader. It also simplifies testing and could enable integration in serverless environments, such as AWS Glue, where users don't have access to modify the JVM arguments.

This has already been implemented

Packaging Spark Integrations

Spark integration packages need to create a file in META-INF/services/marquez.spark.agent.lifecycle.plan.DatasetSource which declares a list of classes that implement the openlineage.spark.agent.lifecycle.plan.DatasetSource interface. During startup, the main MarquezOpenLineage agent uses the JVM ServiceLoader to find all service implementations on the classpath (we need to specifically check the Spark URLClassloader to find implementations loaded by Spark at runtime). Combining jars requires concatenating the contents of the META-INF/services/marquez.spark.agent.lifecycle.plan.DatasetSource files so that all implementations are listed.

Deploying Dependencies

Currently, users deploy the Marquez integration by adding the javaagent:<jar>=http://marquez_url/path/to/job argument to their spark-submit args. That doesn't change with this proposal.

What does change is that users would be required to specify the integrations they require in their spark.jars or spark.jars.packages configuration. As an example, the airflow example in the README would look like this:

from uuid import uuid4
import os
...
job_name = 'job_name'


properties = {
  'spark.extraListeners': 'openlineage.spark.agent.OpenLineageSparkListener',
  'spark.jar.packages': 'io.openlineage:openlineage-spark:0.0.1',
  'spark.openlineage.host': os.environ.get('MARQUEZ_URL'),
  'spark.openlineage.namespace': os.getenv('MARQUEZ_NAMESPACE', 'default'),
  'spark.openlineage.apiKey': os.environ.get('MARQUEZ_API_KEY'),
  'spark.openlineage.parentRunId': "{{task_run_id(run_id, task)}}",
  'spark.openlineage.parentJobName': '{{ task.task_id }}'
}

t1 = DataProcPySparkOperator(
    task_id=job_name,
    gcp_conn_id='google_cloud_default',
    project_id='project_id',
    cluster_name='cluster-name',
    region='us-west1',
    main='gs://bucket/your-prog.py',
    job_name=job_name,
    dataproc_pyspark_properties=properties,

    #Both bigquery relation and bigquery openlineage integration declared here
    dataproc_pyspark_jars='gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar,gs://my_bucket/path/to/bigquery-integration.jar', 
    dag=dag)

Alternatively, a cluster can be created by loading the correct jars into the Spark jars directory, as described in the BigQuery connector docs. In the same way, the MarquezOpenLineage integrations would need to be installed on the driver node. In either case, the person responsible for loading the bigquery jars on the classpath is also responsible for loading the Marquez integration. Dependencies can be bundled so that the Marquez integration and the BigQueryRelation class are all loaded in the same jar.

[PROPOSAL] Dataset version facet

Purpose:
Many databases have explicit versioning in place for datasets. It is highly desirable to track the specific version of the dataset if it is known at consumption/generation time. Rather than relying on the service to correctly tie job inputs to specific versions of a dataset, the client should be allowed to report that information if it is available.

Proposed implementation
A dataset version facet may be sufficient to report dataset version information, if it exists. Custom facets can be used to include any database specific version information that may exist, while the minimum required fields should include a versionName string field.

[PROPOSAL] Define the OpenLineage spec versioning mechanism

Purpose:
We need to define how the OpenLineage spec gets versioned and published.
Some requirements:

  • The OpenLineage spec and related libraries are in the OpenLineage repo.
  • The OpenLineage spec version should change only when the spec itself changes.
  • The libraries in the repo change more frequently than the spec (including when the spec changes).
  • We want to version the OpenLineage spec independently of the api spec.
  • The mechanism to version and publish the OpenLineage core spec, should apply to publishing custom facets.

Proposed implementation

  • The spec defines it’s current version using the “$id” field:
  • The $id urls uses a SEMVER compliant version, following the SCHEMAVER semantics
    MODEL-REVISION-ADDITION
    • MODEL when you make a breaking schema change which will prevent interaction with any historical data
    • REVISION when you introduce a schema change which may prevent interaction with some historical data
    • ADDITION when you make a schema change that is compatible with all historical data

** Implementation plan: **

  • CI verifies that:
    • the $id field has the right domain prefix
    • the version changes when the spec changed: When resolving “$id”, the build fails if the spec is not exactly the same.
    • The version does not change when the spec does not change. We can verify that the current version of the spec is not already published with a different version.
    • Libraries are generating event with current version
    • Make sure the spec is backward compatible (only add optional fields) and consistent with the versioning semantics
  • git pre commit: Increments the versions automatically when the spec changes.
  • spec publication:
    • CI publishes to github pages when the $id changes on main (when this particular url does not exist yet)
    • CI tags main with OpenLineage.json-{version}

The discussion is available on a google doc:
https://docs.google.com/document/d/1inhmb90SB62VyYf8nkkpjBDPpyr2wxkIXQ_AFIQiF9I/edit

Move Marquez integrations to OpenLineage

The Marquez community is excited to announce that the following integrations will be moved to the wider OpenLineage ecosystem:

  • marquez-airflow: A library that integrates Airflow DAGs to emit OpenLineage metadata.
  • marquez-spark: A spark agent using jvm instrumentation to emit OpenLineage metadata.
  • marquez-integration-common: Shared code across integrations.
  • marquez-dbt-snowflake: A library that integrates with DBT to emit OpenLineage metadata for Snowflake.
  • marquez-dbt-bigquery: A library that integrates with DBT to emit OpenLineage metadata for BigQuery.

Note: You can view the integrations in the Marquez repo under marquez/integrations/. Marquez is an official reference implementation of the OpenLineage API and all the integrations adopt the OpenLineage standard.

Why are the integrations being moved to OpenLineage?

The OpenLineage standard was influenced by Marquez's underlying data model (datasets, jobs, runs) while also drawing inspiration from the projects lineage ingestion API. With OpenLineage now a standalone project under LFAI & Data Foundation, we've decided that now would be the right time to move the integrations over as more and more integrations are being added to the projects roadmap (Looker, Flink, Beam, etc). This would also allow for the OpenLineage community to grow and learn from existing integration implementations, prioritize bug fixes, propose new integrations and changes to the spec, etc.

What is the timeline for moving the integrations to OpenLineage?

Below we outline a high-level timeline of moving the integrations to OpenLineage. We encourage everyone to provide feedback and express any concerns you may have by commenting in this issue:

  • Release Marquez 0.16.0
  • Move the following integrations from Marquez to OpenLineage:
    • marquez-airflow #93
    • marquez-spark #96
    • marquez-integration-common #74
    • marquez-dbt-snowflake #117
    • marquez-dbt-bigquery #117
  • Deprecate all integrations in Marquez, then link to the new location in the OpenLineage repo MarquezProject/marquez#1585

Then, going forward, new integrations and improvements will be made by open pull requests against the OpenLineage repo. Note that during this migration phase, existing Marquez specific functionality and environment variables will be maintained to ensure backwards compatibility. The main difference is that integration packages will get a new name: openlineage-* instead of marquez-*.

Marquez doesn't collect dataset metadata for BigQueryOperator in Airflow

Marquez doesn't collect dataset metadata for BigQueryOperator in Airflow. To collect BigQuery metadata, I tried the Airflow example from the repo, but instead of PostgresOperator, I used BigQueryOperator. So, for example, I modified operators in counter.py as:

from airflow.contrib.operators.bigquery_operator import BigQueryOperator

t1 = BigQueryOperator(
    task_id='if_not_exists',
    bigquery_conn_id=BQ_CONN_ID,
    sql='''
    CREATE TABLE IF NOT EXISTS `my-project.my_dataset.counts` (
      value INTEGER
    );''',
    use_legacy_sql=False,
    dag=dag
)
t2 = BigQueryOperator(
    task_id='inc',
    bigquery_conn_id=BQ_CONN_ID,
    sql='''
    INSERT INTO `my-project.my_dataset.counts` (value)
         VALUES ({{params.value}})
    ''',
    params={
        "value": random.randint(1, 10)
    },
    use_legacy_sql=False,
    dag=dag
)

The dags ran successfully in Airflow, the counts and sums tables have been created on GCP, and in Marquez UI, metadata for jobs can be seen, but not for datasets.

[PROPOSAL] Move the OpenLineage spec to JonSchema (from OpenAPI)

Purpose:
OpenAPI and JsonSchema are mostly the same model with a goal of OpenAPI to be a strict superset of JSONSchema in the near future.
The goal is to make OpenLineage independent from the underlying protocol. OpenAPI assumes a REST endpoint which is not our intent here. This would still make possible to define an OpenLineage HTTP endpoint while decoupling the model and make it more obvious that it can be used with Kafka for example.

Proposed implementation
Convert the OpenAPI spec to a json schema.

[INTEGRATION] [Spark] double triggers on DataFrame -> RDD conversion

Discovered when running tests, when a DataFrame is converted to an RDD (such as when writing to a JDBC relation), the marquez listener is double-triggered- once for the DataFrame operation and again for the RDD operation. Fortunately, the original DataFrame LogicalPlan contains the entire input/output graph, so no information is lost. However, the second set of events does post a new job with no inputs or outputs to the Marquez endpoint.

Example of two posted events- one from the DataFrame and one from the RDD:

Publishing sql event LineageEvent(eventType=COMPLETE, eventTime=2021-01-01T00:00Z, run=LineageEvent.Run(runId=39f36ea5-bd33-4ff5-9a64-00fa2f439d21, facets=LineageEvent.RunFacet(nominalTime=null, parent=LineageEvent.ParentRunFacet(run=LineageEvent.RunLink(runId=ab478a6e-0226-46bb-b5c3-2b23179a1277), job=LineageEvent.JobLink(namespace=Namespace, name=ParentJob)), additional={spark.logicalPlan=LogicalPlanFacet(plan=[{"class":"org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand","num-children":0,"query":[{"class":"org.apache.spark.sql.catalyst.plans.logical.Filter","num-children":1,"condition":[{"class":"org.apache.spark.sql.catalyst.expressions.GreaterThan","num-children":2,"left":0,"right":1},{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"age","dataType":"long","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":6,"jvmId":"2bb20251-cb0e-4586-a2b0-ae90290f14e4"},"qualifier":[]},{"class":"org.apache.spark.sql.catalyst.expressions.Cast","num-children":1,"child":0,"dataType":"long","timeZoneId":"America/Los_Angeles"},{"class":"org.apache.spark.sql.catalyst.expressions.Literal","num-children":0,"value":"16","dataType":"integer"}],"child":0},{"class":"org.apache.spark.sql.execution.datasources.LogicalRelation","num-children":0,"relation":null,"output":[[{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"age","dataType":"long","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":6,"jvmId":"2bb20251-cb0e-4586-a2b0-ae90290f14e4"},"qualifier":[]}],[{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"name","dataType":"string","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":7,"jvmId":"2bb20251-cb0e-4586-a2b0-ae90290f14e4"},"qualifier":[]}]],"isStreaming":false}],"dataSource":null,"options":null,"mode":null}])})), job=LineageEvent.Job(namespace=Namespace, name=word_count.execute_save_into_data_source_command, facets=null), inputs=[LineageEvent.Dataset(namespace=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database, name=data_table, facets=LineageEvent.DatasetFacet(documentation=null, schema=LineageEvent.SchemaDatasetFacet(fields=[LineageEvent.SchemaField(name=age, type=decimal(20,0), description=null), LineageEvent.SchemaField(name=name, type=string, description=null)]), dataSource=LineageEvent.DatasourceDatasetFacet(name=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database, uri=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database), description=null, additional={}))], outputs=[LineageEvent.Dataset(namespace=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database, name=data_table, facets=LineageEvent.DatasetFacet(documentation=null, schema=LineageEvent.SchemaDatasetFacet(fields=[LineageEvent.SchemaField(name=age, type=decimal(20,0), description=null), LineageEvent.SchemaField(name=name, type=string, description=null)]), dataSource=LineageEvent.DatasourceDatasetFacet(name=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database, uri=sqlite:file:///var/folders/x_/47q8z0xx2357hrl3w6k9xf_80000gn/T/junit12164537964587498492/sqlite/database), description=null, additional={}))], producer=https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark)
Publishing RDD event LineageEvent(eventType=COMPLETE, eventTime=2021-01-01T00:00Z, run=LineageEvent.Run(runId=ab478a6e-0226-46bb-b5c3-2b23179a1277, facets=LineageEvent.RunFacet(nominalTime=null, parent=LineageEvent.ParentRunFacet(run=LineageEvent.RunLink(runId=ab478a6e-0226-46bb-b5c3-2b23179a1277), job=LineageEvent.JobLink(namespace=Namespace, name=ParentJob)), additional={})), job=LineageEvent.Job(namespace=Namespace, name=word_count, facets=null), inputs=[], outputs=[], producer=https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark)

Note that the RDD event has no inputs/outputs and has a distinct run id.

[PROPOSAL] Make job run ids unique

Purpose:
Currently, the Run object has a runId property that is defined as "The id of the run, unique relative to the job" https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#L61-L63 . This implies a run id may be reused across jobs so that to link a job run to a parent run, the job name and namespaces are required in addition to the run id (see https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#L273-L311 ).

In practice, Marquez generates UUIDs for the job run ids (e.g., see the airflow integration at https://github.com/MarquezProject/marquez/blob/main/integrations/airflow/marquez_airflow/dag.py#L228-L229 ). In the spark integration, there's confusion about whether the passed job run parameters belong to the parent run or to a new job run (see the README at https://github.com/MarquezProject/marquez/blob/main/integrations/spark/README.md and the context at https://github.com/MarquezProject/marquez/blob/main/integrations/spark/src/main/java/marquez/spark/agent/MarquezContext.java#L29-L31 ).

Given the implementation already uses UUIDs for job runs, I think the requirement to pass the additional job information is a bit burdensome and redundant. It also leaves room for confusion in implementation projects, which would be nice to eliminate.

Proposed implementation
I propose requiring job run ids be globally unique (at least within a given OpenLineage backend) and remove the job parameter from the ParentRunFacet. Integrations should pass around these globally unique runIds to link job runs to parents. This is in line with the existing Marquez model (see https://github.com/MarquezProject/marquez/blob/main/clients/python/marquez_client/client.py#L325-L327 ) and will reduce confusion in integration implementations, such as the spark integration project.

[PROPOSAL] Standarize custom facet naming

Purpose:
Main motivation of this proposal is making sure that custom facets in different implementations of OpenLineage clients for the same data source have the same name. Another advantage will be decreased chance of different facets name colliding.

Proposed implementation
I propose that custom facets should follow name pattern {prefix}{name}{entity}Facet PascalCased, where

  • Prefix must be distinct identifier named after the project
  • Name is the desired facet name
  • Entity is the core OpenLineage entity for which the facet will be attached

Important aspect for OpenLineage consumers is standarizing JSON key naming.
I propose to follow pattern {prefix}_{name} snakeCased.

Example of custom facet following both patterns: BigQueryStatisticsJobFacet and bigQuery_statistics

[PROPOSAL] Externalizing Dataset schema facet into custom files

Proposal

IIUC, the specification expects - by default - each Dataset to encode its schema inline in the spec, through SchemaDatasetFacet. I suggest allowing to either encode schema inline or alternatively use an external schema file url according the scheme language of choice of the underlying system.

For example in a data mesh implementation, each data node - uniquely addressable - provides a schema endpoint (URL) for its output datasets. The schema language can follow a different standard based on http://json-schema.org/ that encapsulates the semantic and syntax of the datasets.
The schema URL associated with the data mesh's node is versioned with the data, and localized with the dataset itself.
The assumption of encoding the schema file according to the lineage format - e.g. encoding recognized fields such as types and leaving out unrecognized fields such as range, valid values, etc. - requires writing adapters and extending the implementation of the underlying serializer.
However leaving the schema to be simply a URL to whatever schema format/file/location the underlying mesh or lake uses does not require such an extension - until there is a particular functionality of the implementation that really needs to parse the schema, which I don't think is a first class concern. I could be wrong.

Proposed Implementation

Make external url a first class member of SchemaDatasetFacet, and make an inline schema definition optional.

Add license header to source files

SPDX-License-Identifier: Apache-2.0

From LFAI&Data:

We typically recommend that projects include a short license notice in each project source code file, wherever reasonably possible (e.g., don't worry about this for image files, formats without comments like JSON, etc).

Adding these notices has several benefits, including improving code reuse in other projects by making sure that the license info travels along with the source code file if it is used elsewhere. They can be easily added using a one-line comment with an SPDX short-form identifier, in the following format (adjusting for the file's particular format for comments):

# SPDX-License-Identifier: Apache-2.0

More information on this format is available at https://spdx.dev/ids. It looks like several of the documentation files have these already (correctly with the documentation's CC-BY-4.0 license), so I'm noting this primarily for source code files.

OpenLineage artifacts published on PRs

OpenLineage artifacts (openlineage-java, openlineage-python) are currently being published in PRs, see #60. We'll only want to publish artifacts on the main branch.

Related to #61

[PROPOSAL] Modeling metadata update events

Purpose:

Consider an event like owner_updated event - we will need to have the same asset as part of both inputs and outputs. This is because, this event takes an existing asset as input -- so this asset is part of input assets. Also, this event updates the metadata of the asset. In this sense, the event outputs a new state of the asset. So this asset is also part of output assets.

We should discuss how best to go about representing such events in openlineage format.

Handling of Dataset Partitions

If i have a job scheduled to push a partition to a table each hour.
And a job running daily consuming the 24 last partitions created by the previous job.
Would marquez be able to modelize this dependency ?

I don't see a way to use multiple datasetVersion as input in a request.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.