Coder Social home page Coder Social logo

astronomer / airflow-provider-great-expectations Goto Github PK

View Code? Open in Web Editor NEW
156.0 36.0 53.0 988 KB

Great Expectations Airflow operator

Home Page: http://greatexpectations.io

License: Apache License 2.0

Python 85.29% CSS 0.62% Jupyter Notebook 14.09%
data-science airflow-operators data-quality data-testing airflow airflow-providers

airflow-provider-great-expectations's Introduction

Apache Airflow Provider for Great Expectations

A set of Airflow operators for Great Expectations, a Python library for testing and validating data.

Version Warning:

Due to apply_default decorator removal, this version of the provider requires Airflow 2.1.0+. If your Airflow version is < 2.1.0, and you want to install this provider version, first upgrade Airflow to at least version 2.1.0. Otherwise, your Airflow package version will be upgraded automatically, and you will have to manually run airflow upgrade db to complete the migration.

Notes on compatibility

  • This operator currently works with the Great Expectations V3 Batch Request API only. If you would like to use the operator in conjunction with the V2 Batch Kwargs API, you must use a version below 0.1.0
  • This operator uses Great Expectations Checkpoints instead of the former ValidationOperators.
  • Because of the above, this operator requires Great Expectations >=v0.13.9, which is pinned in the requirements.txt starting with release 0.0.5.
  • Great Expectations version 0.13.8 contained a bug that would make this operator not work.
  • Great Expectations version 0.13.7 and below will work with version 0.0.4 of this operator and below.

This package has been most recently unit tested with apache-airflow=2.4.3 and great-expectation=0.15.34.

Formerly, there was a separate operator for BigQuery, to facilitate the use of GCP stores. This functionality is now baked into the core Great Expectations library, so the generic Operator will work with any back-end and SQL dialect for which you have a working Data Context and Datasources.

Installation

Pre-requisites: An environment running great-expectations and apache-airflow- these are requirements of this package that will be installed as dependencies.

pip install airflow-provider-great-expectations

Depending on your use-case, you might need to add ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true to your Dockerfile to enable XCOM to pass data between tasks.

Usage

The operator requires a DataContext to run which can be specified either as:

  1. A path to a directory in which a yaml-based DataContext configuration is located
  2. A Great Expectations DataContextConfig object

Additonally, a Checkpoint may be supplied, which can be specified either as:

  1. The name of a Checkpoint already located in the Checkpoint Store of the specified DataContext
  2. A Great Expectations CheckpointConfig object

Although if no Checkpoint is supplied, a default one will be built.

The operator also enables you to pass in a Python dictionary containing kwargs which will be added/substituted to the Checkpoint at runtime.

Modules

Great Expectations Base Operator: A base operator for Great Expectations. Import into your DAG via:

from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator

Previously Available Email Alert Functionality

The email alert functionality available in version 0.0.7 has been removed, in order to keep the purpose of the operator more narrow and related to running the Great Expectations validations, etc. There is now a validation_failure_callback parameter to the base operator's constructor, which can be used for any kind of notification upon failure, given that the notification mechanisms provided by the Great Expectations framework itself doesn't suffice.

Examples

See the example_dags directory for an example DAG with some sample tasks that demonstrate operator functionality.

The example DAG can be exercised in one of two ways:

With the open-source Astro CLI (recommended):

  1. Initialize a project with the Astro CLI

  2. Copy the example DAG into the dags/ folder of your astro project

  3. Copy the directories in the include folder of this repository into the include directory of your Astro project

  4. Copy your GCP credentials.json file into the base directory of your Astro project

  5. Add the following to your Dockerfile to install the airflow-provider-great-expectations package, enable xcom pickling, and add the required Airflow variables and connection to run the example DAG:

    RUN pip install --user airflow_provider_great_expectations
    ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
    ENV GOOGLE_APPLICATION_CREDENTIALS=/usr/local/airflow/credentials.json
    ENV AIRFLOW_VAR_MY_PROJECT=<YOUR_GCP_PROJECT_ID>
    ENV AIRFLOW_VAR_MY_BUCKET=<YOUR_GCS_BUCKET>
    ENV AIRFLOW_VAR_MY_DATASET=<YOUR_BQ_DATASET>
    ENV AIRFLOW_VAR_MY_TABLE=<YOUR_BQ_TABLE>
    ENV AIRFLOW_CONN_MY_BIGQUERY_CONN_ID='google-cloud-platform://?extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fbigquery&extra__google_cloud_platform__project=bombora-dev&extra__google_cloud_platform__key_path=%2Fusr%2Flocal%2Fairflow%2Fairflow-gcp.bombora-dev.iam.gserviceaccount.com.json'
    
  6. Run astro dev start to view the DAG on a local Airflow instance (you will need Docker running)

With a vanilla Airflow installation:

  1. Add the example DAG to your dags/ folder
  2. Make the great_expectations and data directories in include/ available in your environment.
  3. Change the data_file and ge_root_dir paths in your DAG file to point to the appropriate places.
  4. Change the paths in great-expectations/checkpoints/*.yml to point to the absolute path of your data files.
  5. Change the value of enable_xcom_pickling to true in your airflow.cfg
  6. Set the appropriate Airflow variables and connection as detailed in the above instructions for using the astro CLI

Development

Setting Up the Virtual Environment

Any virtual environment tool can be used, but the simplest approach is likely using the venv tool included in the Python standard library.

For example, creating a virtual environment for development against this package can be done with the following (assuming bash):

# Create the virtual environment using venv:
$ python -m venv --prompt my-af-ge-venv .venv

# Activate the virtual environment:
$ . .venv/bin/activate

# Install the package and testing dependencies:
(my-af-ge-venv) $ pip install -e '.[tests]'

Running Unit, Integration, and Functional Tests

Once the above is done, running the unit and integration tests can be done with either of the following approaches.

Using pytest

The pytest library and CLI is preferred by this project, and many Python developers, because of its rich API, and the additional control it gives you over things like test output, test markers, etc. It is included as a dependency in requirements.txt.

The simple command pytest -p no:warnings, when run in the virtual environment created with the above process, provides a concise output when all tests pass, filtering out deprecation warnings that may be issued by Airflow, and a only as detailed as necessary output when they dont:

(my-af-ge-venv) $ pytest -p no:warnings
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.4, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
rootdir: /Users/jpayne/repos-bombora/bombora-airflow-provider-great-expectations, configfile: pytest.ini, testpaths: tests
plugins: anyio-3.3.0
collected 7 items

tests/operators/test_great_expectations.py .......                                                                                                                                                   [100%]

============================================================================================ 7 passed in 11.99s ============================================================================================

Functional Testing

Functional testing entails simply running the example DAG using, for instance, one of the approaches outlined above, only with the adjustment that the local development package be installed in the target Airflow environment.

Again, the recommended approach is to use the Astro CLI

**This operator is in early stages of development! Feel free to submit issues, PRs, or join the #integration-airflow channel in the Great Expectations Slack for feedback. Thanks to Pete DeJoy and the Astronomer.io team for the support.

airflow-provider-great-expectations's People

Contributors

antelmoa avatar armandduijn avatar bouke-nederstigt avatar brian-lavery avatar casyfill avatar cdkini avatar denimalpaca avatar enoreese avatar ivanstillfront avatar jeffkpayne avatar kaxil avatar kilo59 avatar kyleaton avatar mpgreg avatar pankajastro avatar petedejoy avatar pgzmnk avatar pre-commit-ci[bot] avatar snjypl avatar spbail avatar sunkickr avatar talagluck avatar tjanif avatar zhangchi1 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airflow-provider-great-expectations's Issues

Expectations fail callback function

It seems to be nice to be able to log out specific expectations that failed, when the Operator fails. I am not sure this should be internalized, so instead, perhaps it makes sense to set a callback function on fail?

I have something as naive as this in mind:

def generate_list_failed_expectations(r) -> str:
    """Note - only for validation results

    Input:
        r (reat_expectations.validation_operators.types.validation_operator_result.ValidationOperatorResult)
    Returns: str
    """
    from itertools import groupby

    lines = ["Validation with Great Expectations failed."]

    for batch, result in r["run_results"].items():
        lines.append(f"{batch}")
        failed = [
            r.expectation_config
            for r in result["validation_result"].results
            if not r.success
        ]
        for col, group_failed in groupby(failed, key=lambda k: k["kwargs"]["column"]):
            lines.append(f"  {col}")
            for test in group_failed:
                lines.append(f'    - {test["expectation_type"]}')
    raise AirflowException("\n".join(lines))

`GreatExpectationsOperator` fails when run validations agains `Athena` and work correct against `Redshift`

Overview

GreatExpectationsOperator fails when run validations agains Athena and work correct against Redshift

Conditions

docker-compose airflow
airflow==2.2.3
airflow-provider-great-expectations==0.1.3
great-expectations==0.14.7

Procedure

  • Created a DAG that run validations inside AWS environment with SqlAlchemyEngine.
  • Defined both datasources identically, just change the connection string / credentials.
  • Run the same operator with both same configurations.

Problems found

  • DataDoc on the slack notification, never shown.
    • Neither with the modification of notify_with: ["all" | "local_site" | None]
      image
  • Athena validations never executed correctly
"exception_info": {
  "exception_traceback": "Traceback (most recent call last):\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/execution_engine.py\", line 397, in resolve_metrics\n    new_resolved = self.resolve_metric_bundle(metric_fn_bundle)\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py\", line 815, in resolve_metric_bundle\n    domain_kwargs=domain_kwargs,\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py\", line 505, in get_domain_records\n    selectable = selectable.columns().subquery()\nAttributeError: 'TextAsFrom' object has no attribute 'subquery'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py\", line 845, in resolve_validation_graph\n    runtime_configuration=runtime_configuration,\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py\", line 1749, in _resolve_metrics\n    runtime_configuration=runtime_configuration,\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/execution_engine.py\", line 401, in resolve_metrics\n    message=str(e), failed_metrics=[x[0] for x in metric_fn_bundle]\ngreat_expectations.exceptions.exceptions.MetricResolutionError: 'TextAsFrom' object has no attribute 'subquery'\n",
  "exception_message": "'TextAsFrom' object has no attribute 'subquery'",
  "raised_exception": true
}

While `Redshift with the exact same configurations, runs OK.

Additional information for the issue

  • When we was developing these procedure with a Lambda, Athena RuntimeBatchRequest query need to be between ( ) to work.
"query": f"(select * from db.table limit 100)"
  • Before these last version of a DAG; we was creation a RuntimeBatchRequest by code, and pushing as a parameter for the checkpoint. This create the EXACTLY SAME ERROR as Athena now.

File reference

DAG gist file for reference (modified / deleted confidential information): dag_poc.py

Connection to Athena via `conn_id`

When using the GreatExpectationsOperator with an Athena conn_id, I received the following from PyAthena: Unable to locate credentials.

I would like to know the best way to provide credentials, as this page here: GX docs - connect to data does not specify how to setup with just an airflow connection.
Should I be using credentials: in the great_expectations.yml file, with secrets injected from the config_variables.yml? Is there anyway to connect with only the conn_id via Airflow Connection?

Parallel Execution of GX in Airflow randomly fails. In serial execution always passes

Parallel Execution of GX provider in Airflow randomly fails.

Here is the log:

***   * /mnt/airdrive/airflow/logs/dag_id=xxxx_pipeline_dag_v4/run_id=scheduled__2024-04-07T00:00:00+00:00/task_id=gx_validate_xxx_col_std_dev/attempt=1.log
[2024-04-08, 09:51:48 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 09:51:49 IST] {taskinstance.py:2214} INFO - Executing <Task(GreatExpectationsOperator): gx_validate_xxx_col_std_dev> on 2024-04-07 00:00:00+00:00
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:60} INFO - Started process 1111645 to run task
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxxx_pipeline_dag_v4', 'gx_validate_xxx_col_std_dev', 'scheduled__2024-04-07T00:00:00+00:00', '--job-id', '622', '--raw', '--subdir', 'DAGS_FOLDER/xxxxxxxx/xxxxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpn6gnlugt']
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:88} INFO - Job 622: Subtask gx_validate_xxx_col_std_dev
[2024-04-08, 09:51:49 IST] {task_command.py:423} INFO - Running <TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [running]> on host xxxx-datapipeline-***-xxxxx.xxxxdatapipelin.xxxxxxxx.com
[2024-04-08, 09:51:49 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxxx_exa_xxxx_col_std_dev' AIRFLOW_CTX_EXECUTION_DATE='2024-04-07T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-07T00:00:00+00:00'
[2024-04-08, 09:51:49 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 09:51:49 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 09:51:49 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
    self.data_context = ge.data_context.FileDataContext(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 64, in __init__
    self._scaffold_project()
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 91, in _scaffold_project
    if self.is_project_scaffolded(self._context_root_directory):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 513, in is_project_scaffolded
    and cls.config_variables_yml_exist(ge_dir)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 261, in config_variables_yml_exist
    config_var_path = config.get("config_variables_file_path")
                      ^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'get'

Another random failure log:

:
:
[2024-04-08, 22:21:00 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 22:21:00 IST] {taskinstance.py:2214} INFO - Executing &lt;Task(GreatExpectationsOperator): gx_validate_xxx_col_not_null&gt; on 2024-04-08 16:50:34.740105+00:00
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:60} INFO - Started process 1539585 to run task
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxx_pipeline_dag_v4', 'gx_validate_xxx_col_not_null', 'manual__2024-04-08T16:50:34.740105+00:00', '--job-id', '633', '--raw', '--subdir', 'DAGS_FOLDER/xxx_dags/xxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpdgy8dl6u']
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:88} INFO - Job 633: Subtask gx_validate_xxx_col_not_null
[2024-04-08, 22:21:00 IST] {task_command.py:423} INFO - Running &lt;TaskInstance: xxx_pipeline_dag_v4.gx_validate_xxx_col_not_null manual__2024-04-08T16:50:34.740105+00:00 [running]&gt; on host xxx-datapipeline-***-private.sub03080733021.xxx.com
[2024-04-08, 22:21:00 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxx_col_not_null' AIRFLOW_CTX_EXECUTION_DATE='2024-04-08T16:50:34.740105+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-04-08T16:50:34.740105+00:00'
[2024-04-08, 22:21:00 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 22:21:00 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - /home/xxx/workspace/***/gx/great_expectations.yml
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - ordereddict([('config_version', 3.0), ('datasources', ordereddict([('xxx_CLEANSED_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))])), ('dynamic_pandas_asset_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))]))])), ('config_variables_file_path', 'uncommitted/config_variables.yml'), ('plugins_directory', 'plugins/'), ('stores', ordereddict([('expectations_store', ordereddict([('class_name', 'ExpectationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'expectations/')]))])), ('validations_store', ordereddict([('class_name', 'ValidationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/validations/')]))])), ('evaluation_parameter_store', ordereddict([('class_name', 'EvaluationParameterStore')])), ('checkpoint_store', ordereddict([('class_name', 'CheckpointStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'checkpoints/')]))])), ('profiler_store', ordereddict([('class_name', 'ProfilerStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'profilers/')]))]))])), ('expectations_store_name', 'expectations_store'), ('validations_store_name', 'validations_store'), ('evaluation_parameter_store_name', 'evaluation_parameter_store'), ('checkpoint_store_name', 'checkpoint_store'), ('data_docs_sites', ordereddict([('local_site', ordereddict([('class_name', 'SiteBuilder'), ('show_how_to_buttons', True), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/data_docs/local_site/')])), ('site_index_builder', ordereddict([('class_name', 'DefaultSiteIndexBuilder')]))]))])), ('anonymous_usage_statistics', ordereddict([('data_context_id', 'dba4d0fa-ce75-444b-94e5-623ad64aecd1'), ('enabled', True)])), ('fluent_datasources', ordereddict([('filesystem_source_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('xxx_CLEANSED', ordereddict([('type', 'csv'), ('filepath_or_buffer', 'data/xxx_CLEANSED.csv')]))]))])), ('dynamic_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('dynamic_pandas_asset', ordereddict([('type', 'dataframe'), ('batch_metadata', ordereddict())]))]))]))])), ('notebooks', None), ('include_rendered_content', ordereddict([('globally', False), ('expectation_suite', False), ('expectation_validation_result', False)]))])
[2024-04-08, 22:21:00 IST] {base.py:1716} ERROR - Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {base.py:145} ERROR - Encountered errors during loading config.  See ValidationError for more details.
[2024-04-08, 22:21:00 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
    self.data_context = ge.data_context.FileDataContext(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 66, in __init__
    self._project_config = self._init_project_config(project_config)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 111, in _init_project_config
    project_config = FileDataContext._load_file_backed_project_config(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 213, in _load_file_backed_project_config
    return DataContextConfig.from_commented_map(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 139, in from_commented_map
    config: Union[dict, BYC] = schema_instance.load(commented_map)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 723, in load
    return self._do_load(
           ^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 909, in _do_load
    self.handle_error(exc, data, many=many, partial=partial)
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 1717, in handle_error
    raise gx_exceptions.InvalidDataContextConfigError(
great_expectations.exceptions.exceptions.InvalidDataContextConfigError: Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=xxx_pipeline_dag_v4, task_id=gx_validate_xxx_col_not_null, execution_date=20240408T165034, start_date=20240408T165100, end_date=20240408T165100
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:107} ERROR - Failed to execute job 633 for task gx_validate_xxx_col_not_null (Error while processing DataContextConfig: _schema; 1539585)
[2024-04-08, 22:21:00 IST] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-04-08, 22:21:01 IST] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check</code>

Test failure due to `apache-airflow-providers-snowflake` 4.3.1

Hey there! I'm one of the core devs from the GX team and wanted to reach out to highlight a test failure we've been seeing in our CI - tests/operators/test_great_expectations.py::test_great_expectations_operator__make_connection_string_snowflake_pkey

As part of our CI, we clone this repository, install dependencies, and run the full suite. I've done some debugging and have come to the conclusion that this is a result of the new 4.3.1 release; testing on 4.3.0 passes all tests.

Here's where it fails:

# test_great_expectations_operator__make_connection_string_snowflake_pkey
>       assert operator.make_connection_configuration() == test_conn_conf

tests/operators/test_great_expectations.py:911: 

Here's the specific part of the snowflake package that fails:

        elif private_key_file:
            private_key_file_path = Path(private_key_file)
            if not private_key_file_path.is_file() or private_key_file_path.stat().st_size == 0:
>               raise ValueError("The private_key_file path points to an empty or invalid file.")
E               ValueError: The private_key_file path points to an empty or invalid file.

/opt/hostedtoolcache/Python/3.8.17/x64/lib/python3.8/site-packages/airflow/providers/snowflake/hooks/snowflake.py:253: ValueError

Error in action StoreValidationResultAction when checkpoint called via GreatExpectationsOperator with "include_unexpected_rows": True

GE version : 0.15.0, 0.15.1

I am calling the GreatExpectationsOperator from one of my airflow dags, where I am passing a checkpoint, which has runtime configuration as below :
runtime_configuration: {"result_format": {"result_format": "SUMMARY","include_unexpected_rows": True}}

Since the unexpected rows are also being pulled, the validation result is failing to store it in local disk. My action list configuration in checkpoint is as below :
_action_list:

  • name: store_validation_result
    action:
    class_name: StoreValidationResultAction
  • name: store_evaluation_params
    action:
    class_name: StoreEvaluationParametersAction
  • name: update_data_docs
    action:
    class_name: UpdateDataDocsAction_

There is no problem when the include_unexpected_rows is toggled to False.

The error Trace is as mentioned below :
_[2022-04-21 14:23:59,760] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:00,136] {validator.py:1646} INFO - 1 expectation(s) included in expectation_suite.
[2022-04-21 14:24:00,477] {logging_mixin.py:109} WARNING -
Calculating Metrics: 0%| | 0/12 [00:00<?, ?it/s]
[2022-04-21 14:24:00,604] {cursor.py:696} INFO - query: [SHOW /* sqlalchemy:_get_schema_primary_keys /PRIMARY KEYS IN SCHEMA sample_db.pub...]
[2022-04-21 14:24:02,266] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:02,370] {cursor.py:696} INFO - query: [SELECT /
sqlalchemy:_get_schema_columns / ic.table_name, ic.column_name, ic.da...]
[2022-04-21 14:24:03,873] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:09,438] {logging_mixin.py:109} WARNING -
Calculating Metrics: 17%|#6 | 2/12 [00:08<00:44, 4.43s/it]
[2022-04-21 14:24:09,465] {cursor.py:696} INFO - query: [SELECT count(
) AS "table.row_count" FROM ge_temp_284c970f]
[2022-04-21 14:24:09,967] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:10,178] {logging_mixin.py:109} WARNING -
Calculating Metrics: 33%|###3 | 4/12 [00:09<00:16, 2.04s/it]
[2022-04-21 14:24:10,237] {cursor.py:696} INFO - query: [SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegme...]
[2022-04-21 14:24:10,782] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:11,241] {cursor.py:696} INFO - query: [SELECT c_nationkey AS unexpected_values FROM ge_temp_284c970f WHERE c_nationkey ...]
[2022-04-21 14:24:11,770] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:13,527] {logging_mixin.py:109} WARNING -
Calculating Metrics: 83%|########3 | 10/12 [00:12<00:01, 1.03it/s]
[2022-04-21 14:24:13,641] {cursor.py:696} INFO - query: [SELECT sum(CASE WHEN (c_nationkey IS NULL) THEN 1 ELSE 0 END) AS "column_values....]
[2022-04-21 14:24:14,463] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:15,786] {logging_mixin.py:109} WARNING -
Calculating Metrics: 100%|##########| 12/12 [00:15<00:00, 1.00s/it]
[2022-04-21 14:24:15,909] {logging_mixin.py:109} WARNING -
Calculating Metrics: 100%|##########| 12/12 [00:15<00:00, 1.28s/it]
[2022-04-21 14:24:15,912] {logging_mixin.py:109} WARNING -
[2022-04-21 14:24:16,255] {validation_operators.py:465} ERROR - Error running action with name store_validation_result
Traceback (most recent call last):
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 453, in _run_actions
checkpoint_identifier=checkpoint_identifier,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 78, in run
**kwargs,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 830, in _run
expectation_suite_id=expectation_suite_ge_cloud_id,
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 161, in set
self.key_to_tuple(key), self.serialize(key, value), **kwargs
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/validations_store.py", line 168, in serialize
value, indent=2, sort_keys=True
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 584, in dumps
serialized = self.dump(obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 560, in dump
result = self._serialize(processed_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 524, in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 309, in serialize
return self._serialize(value, attr, obj, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in _serialize
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 564, in _serialize
return schema.dump(nested_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 555, in dump
PRE_DUMP, obj, many=many, original_data=obj
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1075, in _invoke_dump_processors
tag, pass_many=False, data=data, many=many, original_data=original_data
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1231, in _invoke_processors
data = processor(data, many=many, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/expectation_validation_result.py", line 253, in convert_result_to_serializable
data.result = convert_to_json_serializable(data.result)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 168, in convert_to_json_serializable
new_list.append(convert_to_json_serializable(val))
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 257, in convert_to_json_serializable
f"{str(data)} is of type {type(data).name} which cannot be serialized."
TypeError: (891097, 'Customer#0007', 'eRL', 21, '31-843-843', Decimal('50.04'), 'FUTURE', 'some junk string for testing') is of type RowProxy which cannot be serialized.
[2022-04-21 14:24:16,838] {taskinstance.py:1463} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
result = task_copy.execute(context=context)
File "/root/.local/lib/python3.6/site-packages/great_expectations_provider/operators/great_expectations.py", line 160, in execute
result = self.checkpoint.run()
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 287, in usage_statistics_wrapped_method
result = func(*args, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/checkpoint.py", line 167, in run
validation_dict=validation_dict,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/checkpoint.py", line 367, in _run_validation
**operator_run_kwargs,
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/async_executor.py", line 100, in submit
return AsyncResult(value=fn(*args, **kwargs))
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 392, in run
checkpoint_identifier=checkpoint_identifier,
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 466, in _run_actions
raise e
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 453, in _run_actions
checkpoint_identifier=checkpoint_identifier,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 78, in run
**kwargs,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 830, in _run
expectation_suite_id=expectation_suite_ge_cloud_id,
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 161, in set
self.key_to_tuple(key), self.serialize(key, value), **kwargs
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/validations_store.py", line 168, in serialize
value, indent=2, sort_keys=True
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 584, in dumps
serialized = self.dump(obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 560, in dump
result = self._serialize(processed_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 524, in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 309, in serialize
return self._serialize(value, attr, obj, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in _serialize
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 564, in _serialize
return schema.dump(nested_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 555, in dump
PRE_DUMP, obj, many=many, original_data=obj
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1075, in _invoke_dump_processors
tag, pass_many=False, data=data, many=many, original_data=original_data
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1231, in invoke_processors
data = processor(data, many=many, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/expectation_validation_result.py", line 253, in convert_result_to_serializable
data.result = convert_to_json_serializable(data.result)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 168, in convert_to_json_serializable
new_list.append(convert_to_json_serializable(val))
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 257, in convert_to_json_serializable
f"{str(data)} is of type {type(data).name} which cannot be serialized."
TypeError: (891097, 'Customer#0007', 'eRL', 21, '31-843-843', Decimal('50.04'), 'FUTURE', 'some junk string for testing') is of type RowProxy which cannot be serialized.
[2022-04-21 14:24:17,362] {taskinstance.py:1513} INFO - Marking task as FAILED. dag_id=ge_test_01, task_id=validation_customer_table, execution_date=20220421T142326, start_date=20220421T142338, end_date=20220421T142417
[2022-04-21 14:24:22,341] {local_task_job.py:151} INFO - Task exited with return code 1
[2022-04-21 14:24:23,283] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

I think calling the bashoperator to run the checkpoint can be a workaround , but we want to avoid the bashoperator.

Thanks in advance !!

Parallel GreatExpectationsOperator tasks corrupt great_expectations.yml

I have several GreatExpectationOperator tasks running concurrently. I would get an error often letting me know that the great_expectations.yml file could not be parsed. Inspecting the file, I noticed the file would normally have at the end of the file ult: false. It corresponded with the parsing error in the Airflow logs page of the task.

I suspect a race condition is happening when the great_expectations.yml file is generated due to needing to update the datasource section for a job. After changing GreatExpectationOperator tasks to run one at a time, the great_expectations.yml file has not been corrupted any longer.

TypeError: can't pickle _thread.lock objects when cleaning

This is most likely an issue with airflow itself (we use 1.10.9), but we didn't have any similar issue with it before - on clearing GE operator, whole system goes kabum with this log:

Node: airflow-airflow-web-6f4fbbfc58-cnx4j
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2446, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1820, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1935, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/flask_login/utils.py", line 261, in decorated_view
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 290, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 337, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/www/views.py", line 1296, in clear
    include_upstream=upstream)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1211, in sub_dag
    for t in regex_match + also_include}
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1211, in <dictcomp>
    for t in regex_match + also_include}
  File "/usr/local/lib/python3.7/copy.py", line 161, in deepcopy
    y = copier(memo)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 681, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.7/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.7/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.7/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.7/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.7/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.7/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.7/copy.py", line 169, in deepcopy
    rv = reductor(4)
TypeError: can't pickle _thread.lock objects

Build data_context object in `__init__()` and not in `execute` method

Right now, the self.data_context object is initialized within the execute method of the airflow BaseOperator.

This is done in:

However, this makes impossible to interact with the data context before or after the execution.

If this self.data_context is initiated in the __init__() method, the user could interact with this object in the pre_execute() or post_execute() methods of airflow BaseOperator.

A possible use case, for example, is to add ExpectationsSuites on runtime using an InMemoryStoreBackend Expectation store?

    def pre_execute(self, context: Any):
    """
    Create and add an expectation suite to the in-memory DataContext.
    """
        suite = self.data_context.create_expectation_suite(suite_name=suite_name, overwrite_existing=True)
        
        # Add expectations
        # Here we'll add a simple expectation as an example
        suite.add_expectation(
            expectation_type="expect_table_row_count_to_be_between",
            kwargs={
                "min_value": 1,
                "max_value": 1000000
            }
        )

        # Save the suite to the DataContext's in-memory expectations store
       self.data_context.save_expectation_suite(suite)

Add operator extra link for Data Docs in the `GreatExpectationsOperator`

In Airflow, there is a feature which allows for external links to be present as buttons on the Task Instance modal in the Airflow UI called an operator extra link. This feature is really useful to directly navigate from the Airflow UI to an external site which may be for monitoring a third-party execution that Airflow just triggered, handy documentation, etc.

IMO linking to Data Docs directly from a GreatExpectationsOperator task would be really beneficial to users and increase the holistic visibility that's available at their fingertips.

GreatExpectationsOperator Performance Issue

Is your feature request related to a problem? Please describe.
The GreatExpectationsOperator for Airflow currently instantiates a DataContext in the __init__() method, which gets run every time the task is parsed (which is frequently), slowing down the DAG it is a part of.

Describe the solution you'd like
Move instantiation of DataContext and Checkpoint to the execute() method of the Operator.

I will open a PR on this shortly, feel free to assign to me.

Add Trino support

Hi,
Would you consider adding Trino connection to the operator?

Apparently there is a TODO, but I don't know if it's in your foreseeable rodmap.

Thanks

Discussion: Version Release Dates

Hi @denimalpaca @kaxil, thanks again for approving the PR and maintaining this operator. We are actively spiking and trying to onboard great_expectations airflow operator into our DQ system. And we are happy to keep contributing to this fantastic library. We are wondering do we have any rules for the new version release? For example, how often do we release a new version/tag? How are we managing the version number aligned with the great_expectation python library?

Thanks,

Remove request to database from init method

The GreatExpecatationsBigQueryOperator makes a get_connection request in the init method of the class. This is problematic for a few reasons:

  • It causes a fatal DAG import error if the connection string is not defined.
  • It will consume unnecessary scheduler resources. Every time the Airflow scheduler parses a DAG, Airflow will execute whatever is contained in the init method of your class. If the init method makes requests to the database or over the network, it will place an unnecessary burden on the database.

The general pattern to follow is to have init store the conn_id string and build the hook via get_connection at runtime.

Airflow deployment - Using GreatExpectationsOperator: `great_expectations.data_context.store` does not contain the class: `ProfilerStore`

Hi Guys,
currently i have an Error when I want to use the official Airflow Operator for Great Expectations.

My packages:
airflow-provider-great-expectations==0.1.4
great-expectations==0.15.17
apache-airflow=2.1.4

Bildschirmfoto 2022-08-05 um 15 21 27

---> [2022-08-05 12:49:47,094] {util.py:153} CRITICAL - Error The module: great_expectations.data_context.store does not contain the class: ProfilerStore.

Can you please help ?

Case-sensitive code for ODBC Driver extras

I'm using the ODBC Driver 18 for SQL Server and have it listed as an extra under "Driver:" in my Airflow connection - this would be correct if the source code for the method 'make_connection_configuration' in great_expectations.py didn't specify that the extra should be a lower-case "driver". As soon as I noticed this detail and changed it in my connection the GXOperator works perfectly. To prevent this problem from occuring the code should be changed to allow variations in writing of the "driver" extra.

...
 elif conn_type == "mssql":
                odbc_connector = "mssql+pyodbc"
                ms_driver = self.conn.extra_dejson.get("driver") or "ODBC Driver 17 for SQL Server"
                driver = f"?driver={ms_driver}"
                database_name = self.conn.schema or "master"
...

GreatExpectationsBigQueryOperator is failing on Airflow

Describe the bug
Hi,
I have deployed Great Expectations with Google Cloud Composer. I've created an Airflow DAG to insert data into table from another table and then validate if the data is correct. Then I generated suite file using the scaffold example. I added all of GE files to GCP bucket. But I'm receiving this error: DataContextError: No validation operator 'action_list_operator' was found in your project. Please verify this in your great_expectations.yml. I'm sure that action_list_operator exists in my great_expectations.yml file.

Error message:
DataContextError: No validation operator 'action_list_operator' was found in your project. Please verify this in your great_expectations.yml. I'm sure that action_list_operator exists in my great_expectations.yml file.

How to reproduce:

  1. Installed pybigquery, great_expectations, airflow-provider-great-expectations on the composer environment.
  2. Followed this example with the same folder structure.

Environment Versions:
Airflow Version: 1.10.9+composer
Great Expectations Version: 0.13.11

I've also attached my DAG and great_expectations.yml files.

Workaround:
One workaround to fix this is that I've created my custom operator using this code and added action_list_operator part manually inside create_data_context_config function. But I'm receiving another error: IndexError: tuple index out of range

Workaround Error:
image

attached_files.zip

Thanks in advance!

Issue in connecting the GX operator with Snowflake data

I am currently using the following version:
airflow-provider-great-expectations==0.2.0

I am trying to run Great Expectations operator and pass the snowflake connection id and execute the query.

My code throws out the following error trace:

AIRFLOW_CTX_DAG_RUN_ID=manual__2023-01-30T10:18:21.654701+00:00
[2023-01-30, 10:18:36 UTC] {great_expectations.py:470} INFO - Running validation with Great Expectations...
[2023-01-30, 10:18:36 UTC] {great_expectations.py:472} INFO - Instantiating Data Context...
[2023-01-30, 10:18:36 UTC] {base.py:71} INFO - Using connection ID 'snowflake_conn' for task execution.
[2023-01-30, 10:18:36 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 474, in execute
    self.build_runtime_datasources()
  File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 368, in build_runtime_datasources
    self.build_runtime_sql_datasource_config_from_conn_id()
  File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 304, in build_runtime_sql_datasource_config_from_conn_id
    "connection_string": self.make_connection_string(),
  File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 251, in make_connection_string
    uri_string = f"snowflake://{self.conn.login}:{self.conn.password}@{self.conn.extra_dejson['extra__snowflake__account']}.{self.conn.extra_dejson['extra__snowflake__region']}/{self.conn.extra_dejson['extra__snowflake__database']}/{self.conn.schema}?warehouse={self.conn.extra_dejson['extra__snowflake__warehouse']}&role={self.conn.extra_dejson['extra__snowflake__role']}"  # noqa
KeyError: 'extra__snowflake__account'

On logging the conn in Airflow using the below code:

  @task
  def test_conn():
    from airflow.hooks.base import BaseHook
    conn = BaseHook.get_connection(SNOWFLAKE_CONN_ID)
    task_logger.info(f"connection info {conn.extra_dejson} %s")
Below is the information logged:
[2023-01-30, 10:18:28 UTC] {base.py:71} INFO - Using connection ID 'snowflake_conn' for task execution.
[2023-01-30, 10:18:28 UTC] {test_gx_snowflake.py:265} INFO - connection info {'account': 'rtb82372.us-east-1', 'insecure_mode': False, 'database': 'SFSALES_SFC_SAMPLES_VA3_SAMPLE_DATA', 'warehouse': 'XLARGEWH'} %s
[2023-01-30, 10:18:28 UTC] {python.py:177} INFO - Done. Returned value was: None

Reference issue at airflow side:
apache/airflow#26764

GreatExpectationsBigQueryOperator - tuple index out of range error

Hello team,

Iโ€™m receiving the below error when I try to use GreatExpectationsBigQueryOperator

[2021-05-20 00:23:39,025] {validation_operators.py:405} ERROR - Error running action with name update_data_docs
Traceback (most recent call last)
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 392, in _run_action
    payload=batch_actions_results
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 62, in ru
    **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 939, in _ru
    validation_result_suite_identifier.expectation_suite_identifier
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 262, in usage_statistics_wrapped_metho
    result = func(*args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/data_context.py", line 2304, in build_data_doc
    resource_identifiers, build_index=build_inde
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/render/renderer/site_builder.py", line 293, in buil
    site_section_builder.build(resource_identifiers=resource_identifiers
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/render/renderer/site_builder.py", line 388, in buil
    source_store_keys = self.source_store.list_keys(
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 142, in list_key
    return [self.tuple_to_key(key) for key in keys_without_store_backend_id
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 142, in <listcomp
    return [self.tuple_to_key(key) for key in keys_without_store_backend_id
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 110, in tuple_to_ke
    return self._key_class.from_tuple(tuple_
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/types/resource_identifiers.py", line 164, in from_tupl
    RunIdentifier.from_tuple((tuple_[-3], tuple_[-2]))
IndexError: tuple index out of rang

Here is my airflow task:

validate_output = GreatExpectationsBigQueryOperator(
    task_id='validate_output',
    gcp_project='<project_id>',
    #data_context=DC,
    gcs_bucket='<my_bucket_id>',
    gcs_datadocs_prefix='dags/playground/test_great_expectations/great_expectations',
    gcs_expectations_prefix='dags/playground/test_great_expectations/great_expectations/expectations',
    gcs_validations_prefix='dags/playground/test_great_expectations/great_expectations',
    expectation_suite_name='test_new_suite',
    bq_dataset_name='data_platform_staging',
    email_to='[email protected]',
    send_alert_email=False,
    bigquery_conn_id='bigquery-greatexpectations',
    table='project_id.test_dataset.test_table',
    dag=dag
)

Iโ€™m using the latest great-expectations & airflow-provider-great-expectations dependencies.
I'm using Version : 1.10.14+composer.

Thanks in advance!

Triggering LegacyCheckpoint without 'validation_operator_name' from GreatExpectationsOperator

@talagluck as I wrote on the Slack channel,
I'm trying to trigger a Slack notification with the GreatExpectationsOperator.
It necessary to use the validation_operator_name to trigger such notification and it is not possible with the current configuration:
https://github.com/great-expectations/airflow-provider-great-expectations/blob/e92a613db94aa872957fa8626f6d5e71690fe421/great_expectations_provider/operators/great_expectations.py#L130

Error running GreatExpectationsOperator

Describe the bug
When running a great_expectation suite on Google Cloud Composer using BigQuery as a datasource, I'm getting the follwing error:

[2021-07-23 17:40:24,866] {xcom.py:237} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-07-23 17:40:24,866] {taskinstance.py:1457} ERROR - Object of type ValidationOperatorResult is not JSON serializable
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1113, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1287, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1320, in _execute_task
    self.xcom_push(key=XCOM_RETURN_KEY, value=result)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1896, in xcom_push
    XCom.set(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 88, in set
    value = XCom.serialize_value(value)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 235, in serialize_value
    return json.dumps(value).encode('UTF-8')
  File "/opt/python3.8/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/opt/python3.8/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/opt/python3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/opt/python3.8/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type ValidationOperatorResult is not JSON serializable

To Reproduce
Steps to reproduce the behavior:

  1. Dag File
  2. GE config file
  3. GE suite

Expected behavior
It should run and push the result to XComs

Environment (please complete the following information):

  • Cloud Composer Verison: 1.17.0
  • Airflow Version: 2.0.2
  • Great Expectations Version: 0.13.22

Additional context
If I set do_xcom_push to False it works but I can't fetch the results after execution.

collision with another protobuf version

Describe the bug:
DAG contating great_expecations opeartor in astronomer airflow environment - doesn't load.
Tried downgrading to protobuf==3.2.1, but have several google packages that require latest (4.2.x) protobuf version.

Steps to reproduce the behavior:
Restart astronomer airflow environment

Expected behavior:
DAG is succsesfully loaded

Environment (please complete the following information):

Operating System: Linux (docker on vm)
Great Expectations Version: 0.15.46

Additional context:

Broken DAG: [/usr/local/airflow/dags/great_expectations/great_expectations_athena.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/google/cloud/secretmanager_v1/proto/resources_pb2.py", line 57, in <module>
    _descriptor.EnumValueDescriptor(
  File "/usr/local/lib/python3.9/site-packages/google/protobuf/descriptor.py", line 755, in __new__
    _message.Message._CheckCalledFromGeneratedFile()
TypeError: Descriptors cannot not be created directly.
If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
If you cannot immediately regenerate your protos, some other possible workarounds are:
 1. Downgrade the protobuf package to 3.20.x or lower.
 2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).

More information: https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates

Feature request: Way to pass in schema name to overwrite the schema pulled from the conn_id

Hi :)

I've been exploring how to use the building the datasource from a provided conn_id-feature and ran into a small issue:
Our dev (and prod) environment is set up with Snowflake tables located in different schemas, something like:

dev_db
- schema_1
-- table_1
-- table_2
- schema_2
-- table_3
-- table_4
etc

There is currently only one snowflake connection in the dev environment which has its schema field left empty. Of course I (or rather the deployment admin) could create one additional snowflake connection per schema but this does not seem ideal or scaleable.

Because of this I've been trying to find a way to change the schema of the datasource created when passing in a conn_id but I could not find a way to do so e.g. via a data_contex_config with the intended schema.

When trying to leave the schema in the Airflow connection blank and passing in the schema name (TAMARAFINGERLIN) with the table name (CLUSTERS) to data_asset_name like this:

t1 = GreatExpectationsOperator(
        task_id="t1",
        data_asset_name="TAMARAFINGERLIN.CLUSTERS",
        conn_id="{{conn.galaxy_snowflake_etl}}",
        data_context_root_dir=ge_root_dir,
        expectation_suite_name="CLUSTERS",
    )

the following error happens:

[2022-12-01T19:11:34.849+0000] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 803, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 275, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 330, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 209, in default_errorhandler
    raise error_class(
snowflake.connector.errors.ProgrammingError: 090106 (22000): 01a8aedf-0506-3740-0000-6821196f772e: Cannot perform CREATE TEMPTABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 521, in execute
    result = self.checkpoint.run(batch_request=self.batch_request)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 304, in usage_statistics_wrapped_method
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/checkpoint/checkpoint.py", line 194, in run
    self._run_validation(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/checkpoint/checkpoint.py", line 339, in _run_validation
    validator: Validator = self.data_context.get_validator(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 1481, in get_validator
    self.get_batch_list(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 304, in usage_statistics_wrapped_method
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 1666, in get_batch_list
    return datasource.get_batch_list_from_batch_request(batch_request=batch_request)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/new_datasource.py", line 205, in get_batch_list_from_batch_request
    ) = data_connector.get_batch_data_and_metadata(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/data_connector/data_connector.py", line 116, in get_batch_data_and_metadata
    batch_data, batch_markers = self._execution_engine.get_batch_data_and_markers(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py", line 1243, in get_batch_data_and_markers
    batch_data = SqlAlchemyBatchData(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_batch_data.py", line 161, in __init__
    self._create_temporary_table(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_batch_data.py", line 295, in _create_temporary_table
    self._engine.execute(stmt)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1274, in execute
    return self._exec_driver_sql(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1578, in _exec_driver_sql
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 803, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 275, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 330, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 209, in default_errorhandler
    raise error_class(
sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090106 (22000): 01a8aedf-0506-3740-0000-6821196f772e: Cannot perform CREATE TEMPTABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
[SQL: CREATE OR REPLACE TEMPORARY TABLE ge_temp_7a4f3e73 AS SELECT * 
FROM "TAMARAFINGERLIN.CLUSTERS" 
WHERE true]
(Background on this error at: https://sqlalche.me/e/14/f405)
[2022-12-01T19:11:34.888+0000] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=GE_TEST, task_id=t1, execution_date=20221201T191130, start_date=20221201T191131, end_date=20221201T191134

I was wondering if there could be a way to override some parameters of the connection provided via the conn_id. This would allow me to map the operator over sets of schema names, tables names and expectation suite names without having to have admin access to the deployment or manually create a lot of connections. :)

cc @denimalpaca

Snowflake Region should be optional

Currently gexp provider assumes a snowflake region will always be provided.

Some snowflake regions (ie. us-west-2) do not require (or allow) a region in the connection string. Additionally, some regions have multiple deployments and it becomes necessary to specify region and deployment. This is best accomplished by putting the fully-qualified name in the account field (ie. account.deployment.region.cloud). For this to work the region field must be empty.

How to pass create_temp_table: False to SqlAlchemy?

When using great expectations with datasources from great_expectations.yml you can use

  create_temp_table: False

setting for SqlAlchemyExecutionEngine to prevent SqlAlchemy from issuing SQLs like this:

CREATE OR REPLACE TEMPORARY TABLE gx_temp_d5435 AS select ...

Question:
How to do the same if using airflow-provider-great-expectations with Airflow conn_id (which overrides datasources from great_expectations.yml)?

SQL Alchemy dependencies issue

Hi,
We have a dependency conflict when using the provider package.

Starting from Airflow 2.3.3, the Airflow constraints file requires Sql Alchemy 1.4.27 while the great expectations provider requires a version earlier than 1.4.10.

Can someone please advice? Is this requirement planned to be changed?
Thanks a lot!

Import and instrumenting the GreatExpectationsOperator has unintended side effects causing Tasks to go into weird states

Bug

Example log line: [2021-04-19 14:52:15,487] {taskinstance.py:1150} ERROR - Executor reports task instance <TaskInstance: {dag_name}.{task_name} 2021-04-18 07:00:00+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?

I have a DAG with ~40 tasks. I recently added GreatExpectations to it. As soon as I introduced this to my DAG weird things occurred with my tasks not properly QUEUEing and being marked failed before even going into RUNNING state. As soon as I remove from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator my DAG successfully runs without experiencing any of these side effects

Expected Behaviour

Using the GreatExpectationsOperator has no unintended consequences

Environment

Astronomer Cloud and locally in docker-compose
Executor: Kubernetes Executor
Airflow: 1.10.12
Docker Image: quay.io/astronomer/ap-airflow:1.10.12-alpine3.10-onbuild

Expectation suite not found

Hi

I am getting the following error in the logs when trying to run my expectations via airflow.

great_expectations.exceptions.exceptions.DataContextError: expectation_suite accounts not found

import os
import airflow
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator


default_args = {
  'dir': '/project/great_expectations',
  'start_date': airflow.utils.dates.days_ago(1),
  'profiles_dir': '/home/airflow/.dbt/',
  'catchup': False
}


with DAG(dag_id='accounts_great_expectations', default_args=default_args, schedule_interval='@daily') as dag:
    accounts = GreatExpectationsOperator(
        task_id='accounts',
        expectation_suite_name='accounts',
        data_context_root_dir='/project/great_expectations',
        batch_kwargs={
            'table': 'raw_accounts',
            'datasource': 'fusioncell_local'
        },
        dag=dag
    )

  accounts_great_expectations

It can find the great_expectation.yml file fine though which in /project/great_expectations and my expectations are in a directory one below that called expectations??

Thanks

Feature Request: run EXPERIMENTAL expectation (from great_expectations_experimental library) from Airflow?

Hi!
Is it possible to run
1) EXPERIMENTAL expectation (from great_expectations_experimental library) from Airflow?

example: expect_queried_column_values_to_exist_in_second_table_column

Simple import to DAG does not help:

from great_expectations_experimental.expectations.expect_queried_column_values_to_exist_in_second_table_column import ExpectQueriedColumnValuesToExistInSecondTableColumn

  • after DAG run getting this text in DataDocs instead of the expectation result:

expect_queried_column_values_to_exist_in_second_table_column(**{'batch_id': '0120cd462e58ed32be35bc92c0ae', 'template_dict': {'condition': '1=1', 'first_table_column': 'PROV_ID', 'second_table_column': 'PROV_ID', 'second_table_full_name': 'LINC'}}) (edited)

2) a custom expectation from great_expectations/plugins/expectations folder?
could it be run from Airflow? how?
https://docs.greatexpectations.io/docs/guides/expectations/creating_custom_expectations/how_to_use_custom_expectations/ (edited)

data_asset_name is not recognized in airflow-provider-great-expectations==0.2.0

Hi team, we are working on integrating GX with snowflake datasource into our data validation system via GreatExpectationsOperator .

We are planning to run some expectations validation against a snowflake table named test_sf_table . However, we are getting KeyError: 'data_asset_name test_sf_table is not recognized.' when running our DAG. We have try both upper and lower cases with and without schema, such as data_asset_name: <schema_name>.<table_name> .

Does anyone know what the issue could be? Or is there any configuration issue in my data_context_config, checkpoint_config? Any help would be greatly appreciated ~~

Detailed Info:
we are using
airflow-provider-great-expectations==0.2.0

datasource_config:

sf_url = f'snowflake://{username}:{password}@{account}.{region}/{database}/{schema}?warehouse={warehouse}&role={role}&application=great_expectations_oss'

sf_datasource_config = {
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "class_name": "SqlAlchemyExecutionEngine",
            "connection_string": sf_url,
        },
        "data_connectors": {
            "default_runtime_data_connector_name": {
                "class_name": "RuntimeDataConnector",
                "batch_identifiers": ["default_identifier_name"],
            },
            "default_inferred_data_connector_name": {
                "class_name": "InferredAssetSqlDataConnector",
                "include_schema_name": True,
                "included_tables": f"{schema}.test_sf_table".lower()
            },
        },
    }

data_context_config:

base_path = Path(__file__).parents[3]
ge_root_dir = os.path.join(base_path, "include", "great_expectations")
snowflake_data_context_config = DataContextConfig(
    **{
        "config_version": 3.0,
        "datasources": {
            "my_snowflake_datasource": sf_datasource_config
        },
        "stores": {
            "expectations_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(ge_root_dir, "expectations"),
                },
            },
            "validations_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "validations"
                    ),
                },
            },
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
            "checkpoint_store": {
                "class_name": "CheckpointStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "suppress_store_backend_id": True,
                    "base_directory": os.path.join(ge_root_dir, "checkpoints"),
                },
            },
        },
        "expectations_store_name": "expectations_store",
        "validations_store_name": "validations_store",
        "evaluation_parameter_store_name": "evaluation_parameter_store",
        "checkpoint_store_name": "checkpoint_store",
        "data_docs_sites": {
            "local_site": {
                "class_name": "SiteBuilder",
                "show_how_to_buttons": True,
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "data_docs", "local_site"
                    ),
                },
                "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
            }
        },
        "anonymous_usage_statistics": {
            "data_context_id": "abcdabcd-1111-2222-3333-abcdabcdabcd",
            "enabled": True,
        },
        "notebooks": None,
        "concurrency": {"enabled": False},
    }
)

checkpoint_config:

snowflake_checkpoint_config = CheckpointConfig(
    **{
        "name": "test_sf_checkpoint",
        "config_version": 1.0,
        "template_name": None,
        "module_name": "great_expectations.checkpoint",
        "class_name": "Checkpoint",
        "run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
        "expectation_suite_name": "sf_test.demo",
        "action_list": [
            {
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"},
            },
            {
                "name": "store_evaluation_params",
                "action": {"class_name": "StoreEvaluationParametersAction"},
            },
            {
                "name": "update_data_docs",
                "action": {"class_name": "UpdateDataDocsAction", "site_names": []},
            },
        ],
        "evaluation_parameters": {},
        "runtime_configuration": {},
        "validations": [
            {
                "batch_request": {
                    "datasource_name": "my_snowflake_datasource",
                    "data_connector_name": "default_inferred_data_connector_name",
                    "data_asset_name": "test_sf_table".lower(),
                    "data_connector_query": {"index": -1},
                },
            }
        ],
        "profilers": [],
        "ge_cloud_id": None,
        "expectation_suite_ge_cloud_id": None,
    }
)

operator:

ge_snowflake_validation = GreatExpectationsOperator(
    task_id="test_snowflake_validation",
    data_context_config=snowflake_data_context_config,
    checkpoint_config=snowflake_checkpoint_config
)

Passing value_set through Evaluation Parameters crashes if list size is more than 100

Dialect Used: Snowflake
Expectation Used : expect_column_values_to_be_in_set

Example of expectation:

"expectations": [
    {
      "expectation_type": "expect_column_values_to_be_in_set",
      "kwargs": {
        "column": "column_A",
        "value_set": {
          "$PARAMETER": "valid_ids"
        }
      },
      "meta": {
        "id": "test_unknown_ids"
      }
    }
  ],

Problem:
If the size of list passed through evaluation parameters(Not loading params from db, loading them from python list) to this expectation crosses 100, it breaks the underlying json.

Error Message:

sqlalchemy.exc.DataError: (psycopg2.errors.InvalidTextRepresentation) invalid input syntax for type json
LINE 1: ...rendered_task_instance_fields SET rendered_fields='{"run_nam...
                                                             ^
DETAIL:  Token "Infinity" is invalid.
CONTEXT:  JSON data, line 1: ...6e5", 150641421, "e6096a21", "e6093099", Infinity...

Question:
Is it a bug that expectation crashes on list size more than 100 or is it desired behaviour?
I have verified that it only crashes when I pass 100+ size value_set through evaluation params, expectation works fine if I add the list of 100+ size directly into expectation json. So I think it must be something related to handling of big json objects by operator.

Operator won't work in 0.2.1

With the new 0.2.1 release of the Great Expectations Operator we've been getting the below error. We reverted to 0.2.0 and the error went away.

Error:

[2022-11-17, 01:58:38 UTC] {{abstractoperator.py:175}} ERROR - Failed to resolve template field 'data_context_root_dir'
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 173, in resolve_template_files
    setattr(self, field, env.loader.get_source(env, content)[0])  # type: ignore
  File "/usr/local/lib/python3.9/site-packages/jinja2/loaders.py", line 218, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /usr/src/app/great_expectations

Code:

    ge_data_unit_test_op = GreatExpectationsOperator(
        task_id="test_package",
        data_context_root_dir=ge_root_dir,
        checkpoint_name=test_package,
        checkpoint_kwargs={
            "batch_request": {
                "datasource_name": TARGET_PROJECT_ID,
                "data_asset_name": data_asset_name,
            }
        },
    )

Cannot run validation on BigQuery

Hello!

I'm trying to execute Great Expectations in my airflow instance to run some validations in my BigQuery tables. There's not much content about this on the internet, and I even found some references to an old GreatExpectationsBigQueryOperator.

I've setup my connection on Airfow as a Google Cloud connection and it works for other things (such as querying data), but on the GreatExpectationsOperator I get this error below:

ERROR - Failed to execute job 118 for task gx_validate_products (Conn type: google_cloud_platform is not supported.; 425)

Here's my code:

 gx_validate_products = GreatExpectationsOperator(
        task_id="gx_validate_products",
        conn_id="gcp_connection",
        data_context_root_dir="great_expectations",
        data_asset_name="sample_ecommerce.products",
        expectation_suite_name="raw_products",
        return_json_dict=True,
    )

GreatExpectationsOperator is overriding database name with schema

hello,

Usecase: Database name is different from schema names and multiple schemas are available..

  1. Trying to connect to redshift db name: main (using conn_id and database name is passed in airflow UI) is failing when schema= "login" or data_asset_name="schema.table_name" is passed as parameter to GreatExpectationsOperator

Error: connection failed for database login.

  1. If schema is not passed as parameter or with data_asset_name then seeing

InvalidSchemaName: schema "main" does not exist

Screenshot 2023-04-14 at 2 22 21 PM

`great-expectations>=0.14` causes `commented_map` missing error

I could not check if this has to do with Airflow 2.4.x, but I'm getting this error when using GreatExpectationsOperator with CheckpointConfig when using great-expectations>=0.14:

[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Exception in thread Thread-1:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Traceback (most recent call last):
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 209, in assert_valid_keys
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     _ = self[name]
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 73, in __getitem__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return getattr(self, item)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 151, in commented_map
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return self._get_schema_validated_updated_commented_map()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 125, in _get_schema_validated_updated_commented_map
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     commented_map: CommentedMap = copy.deepcopy(self._commented_map)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - AttributeError: 'CheckpointConfig' object has no attribute '_commented_map'
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - 
During handling of the above exception, another exception occurred:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Traceback (most recent call last):
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 212, in assert_valid_keys
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     _ = self[f"_{name}"]
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 73, in __getitem__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return getattr(self, item)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - AttributeError: 'CheckpointConfig' object has no attribute '_commented_map'
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - 
During handling of the above exception, another exception occurred:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Traceback (most recent call last):
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/threading.py", line 980, in _bootstrap_inner
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     self.run()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/threading.py", line 917, in run
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     self._target(*self._args, **self._kwargs)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/listener.py", line 114, in on_running
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     **get_custom_facets(task, dagrun.external_trigger, task_instance_copy)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/utils.py", line 265, in get_custom_facets
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     "airflow_version": AirflowVersionRunFacet.from_task(task),
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/facets.py", line 34, in from_task
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     to_json_encodable(task),
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/utils.py", line 101, in to_json_encodable
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return json.loads(json.dumps(task.__dict__, default=_task_encoder))
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/json/__init__.py", line 234, in dumps
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return cls(
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/json/encoder.py", line 199, in encode
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     chunks = self.iterencode(o, _one_shot=True)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/json/encoder.py", line 257, in iterencode
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return _iterencode(o, 0)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/utils.py", line 99, in _task_encoder
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return str(obj)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2883, in __str__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return self.__repr__()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2862, in __repr__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     json_dict: dict = self.to_json_dict()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2850, in to_json_dict
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     dict_obj: dict = self.to_dict()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 140, in to_dict
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     for key in self.property_names(
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 227, in property_names
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     assert_valid_keys(keys=exclude_keys, purpose="exclusion")
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 214, in assert_valid_keys
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     raise ValueError(
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - ValueError: Property "commented_map", marked for exclusion on object "<class 'great_expectations.data_context.types.base.CheckpointConfig'>", does not exist.

I don't see this error with >=0.15, but with 0.14.x, I see this additional error:

[2022-10-07, 19:46:14 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 117, in instantiate_class_from_config
    class_instance = class_(**config_with_defaults)
TypeError: __init__() got an unexpected keyword argument 'site_names'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 142, in execute
    self.checkpoint = instantiate_class_from_config(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 121, in instantiate_class_from_config
    class_name, format_dict_for_error_message(config_with_defaults)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 165, in format_dict_for_error_message
    return "\n\t".join("\t\t".join((str(key), str(dict_[key]))) for key in dict_)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 165, in <genexpr>
    return "\n\t".join("\t\t".join((str(key), str(dict_[key]))) for key in dict_)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/config_peer.py", line 80, in __repr__
    return str(self.get_config())
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2130, in __str__
    return self.__repr__()
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2109, in __repr__
    json_dict: dict = self.to_sanitized_json_dict()
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2098, in to_sanitized_json_dict
    serializeable_dict = self.to_json_dict()
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2088, in to_json_dict
    serializeable_dict: dict = convert_to_json_serializable(data=dict_obj)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
    new_dict[str(key)] = convert_to_json_serializable(data[key])
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
    new_dict[str(key)] = convert_to_json_serializable(data[key])
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/util.py", line 256, in convert_to_json_serializable
    raise TypeError(
TypeError: <great_expectations.data_context.types.base.DatasourceConfig object at 0x7f4009f096a0> is of type DatasourceConfig which cannot be serialized.

Current environment:
apache-airflow==2.4.1
airflow-provider-great-expectations==0.1.5
great-expectations>=0.14

I confirmed that the same DAG works with great-expectations==0.13.49 (this requires pinning jinja2>=3.0.0,<3.1.0)

Missing `template_fields` in operator

Is:


Should be:
template_fields = ('checkpoint_name', 'batch_kwargs', 'assets_to_validate')

Somebody removed the template_fields declaration for the operator, which basically makes the operator useless. Please fix that ASAP and COVER it with unit tests to prevent such situation from happening in the future.

Feature Request: pass parameters from Airflow to GE Checkpoint

We need to run a GE checkpoint from Airflow.
Checkpoint is based on SQL query.
SQL query must get values for its parameters from Airflow - e.g. a datamart should be checked for DQ for particular date and region after that date and region were refreshed from another Airflow task.

Part of checkpoint.yml looks like:

validations:
  - batch_request:
      datasource_name: snowflake
      data_connector_name: default_runtime_data_connector_name
      data_asset_name: db1.table1
      runtime_parameters:
        query: "SELECT *
        	from db1.table1
	        WHERE fld1 > $DATE_PARAM_FROM_AIRFLOW and fld2 = $REGION_PARAM_FROM_AIRFLOW
"

How to do it properly with GreatExpectationsOperator?

Looks like it can't pass parameters only,
while query_to_validate or checkpoint_config will break unit tests (you will need airflow to test your checkpoint!)

Workaround: use environment variables.

Thanks!

Remove `apply_defaults` from `GreatExpectationsBigQueryOperator` (or remove explicit kwargs)

So apply_defaults throws an error if any args or kwargs which are explicitly mentioned in the function signature are empty (i.e. None). Since GreatExpectationsBigQueryOperator names expectation_suite_name as an positional arg (not variable, i.e. hidden in *args), it becomes mandatory to the function call.

The problem is when I want to use checkpoints, which have this check in the GreatExpectationsOperator:

# Check that only the correct args to validate are passed
# this doesn't cover the case where only one of expectation_suite_name or batch_kwargs is specified
# along with one of the others, but I'm ok with just giving precedence to the correct one
if sum(bool(x) for x in [(expectation_suite_name and batch_kwargs), assets_to_validate, checkpoint_name]) != 1:
    raise ValueError("Exactly one of expectation_suite_name + batch_kwargs, assets_to_validate, \
     or checkpoint_name is required to run validation.")

As a result, I am mandated by GreatExpectationsBigQueryOperator apply_defaults() call to have expectation_suite_name, but the GreatExpectationsOperator requires me to only name the checkpoint_name.

Without expectation_suite_name:

airflow.exceptions.AirflowException: Argument ['expectation_suite_name'] is required

With expectation_suite_name:

Traceback (most recent call last):
  File "dags/gfk/test_ge.py", line 54, in <module>
    dag=dag,
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/great_expectations_provider/operators/great_expectations_bigquery.py", line 150, in __init__
    **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/great_expectations_provider/operators/great_expectations.py", line 85, in __init__
    or checkpoint_name is required to run validation.")
ValueError: Exactly one of expectation_suite_name + batch_kwargs, assets_to_validate,              or checkpoint_name is required to run validation.

I wonder why it fails though, since the check seems to allow for one of the values (batch_kwargs or expectation_suite_name) to be set, but it does not work. I will investigate it bit further tomorrow.

In order to access the expectation_suite_name of the parent class, just put it back to the variable kwargs, call super() and access it via self, maybe? Or remove apply_defaults?

Versions

  • airflow==1.10.12
  • airflow-provider-great-expectations==0.0.4
  • great-expectations==0.13.19
  • Python 3.7.10

Snowflake connection failure when only using the Airflow UI fields to set the connection

Hi!

I noticed that when only using the Airflow UI fields to set a connection the Extra JSON rendered has different keys than what the GXO is looking for.

Extra field automatically rendered from connection UI fields:
{"account": "myacc", "warehouse": "HUMANS", "database": "DWH_DEV", "region": "us-east-1", "role": "myrole", "insecure_mode": false}

The GXO is looking for extra__snowflake__account, extra__snowflake__region etc. I think this might have been how keys used to be rendered? Setting the connection with these keys adding extra__snowflake__ in front of the parameters everything works as expected.

Error trace:

[2023-01-05, 13:37:40 UTC] {great_expectations.py:454} INFO - Instantiating Data Context...
[2023-01-05, 13:37:40 UTC] {base.py:73} INFO - Using connection ID 'galaxy_snowflake_etl' for task execution.
[2023-01-05, 13:37:40 UTC] {taskinstance.py:1772} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 456, in execute
    self.build_runtime_datasources()
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 366, in build_runtime_datasources
    self.datasource = self.build_configured_sql_datasource_config_from_conn_id()
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 259, in build_configured_sql_datasource_config_from_conn_id
    conn_str = self.make_connection_string()
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 246, in make_connection_string
    uri_string = f"snowflake://{self.conn.login}:{self.conn.password}@{self.conn.extra_dejson['extra__snowflake__account']}.{self.conn.extra_dejson['extra__snowflake__region']}/{self.conn.extra_dejson['extra__snowflake__database']}/{schema}?warehouse={self.conn.extra_dejson['extra__snowflake__warehouse']}&role={self.conn.extra_dejson['extra__snowflake__role']}"  # noqa
KeyError: 'extra__snowflake__account'```

Can't use `checkpoint_kwargs` with `conn_id`

When I add checkpoint_kwargs to my code it error. But if I delete checkpoint_kwargs the task can run success.
This is my code:

GreatExpectationsOperator(
        task_id="task_1234",
        data_context_root_dir="/opt/airflow/plugins/gx/",
        conn_id="postgres",
        schema="public",
        data_asset_name="table",
        query_to_validate=query_sql,
        expectation_suite_name="sample_suite",
        checkpoint_kwargs={
            "action_list":[],
        },
        return_json_dict=True,
        dag=dag,
    )

ERROR:

File "/home/airflow/.local/lib/python3.12/site-packages/great_expectations/checkpoint/checkpoint.py", line 282, in run
    raise gx_exceptions.CheckpointError(
great_expectations.exceptions.exceptions.CheckpointError: Checkpoint "table.sample_suite.chk" must be called with a validator or contain either a batch_request or validations.

Error initializing a Checkpoint using CheckpointConfig

In great_expectations==0.14.4 the Checkpoint class does not have a module_name class attribute (https://github.com/great-expectations/great_expectations/blob/0.14.4/great_expectations/checkpoint/checkpoint.py#L74) however when initializing a Checkpoint on the Great Expectations Operator (https://github.com/great-expectations/airflow-provider-great-expectations/blob/main/great_expectations_provider/operators/great_expectations.py#L139) a Checkpoint is initialized with all the attributes of checkpoint_config.to_json_dict() but the CheckpointConfig has a module_name attribute so this causes an error when initializing the Checkpoint.

Stack Trace

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/great_expectations_provider/operators/great_expectations.py", line 139, in __init__
    data_context=self.data_context, **self.checkpoint_config.to_json_dict()
TypeError: __init__() got an unexpected keyword argument 'module_name'

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.