Comments (18)
thanks @denimalpaca , I'm trying it right now.
Also in your CheckpointConfig, in the batch_request, I'd try removing the .lower() in the data_asset_name and removing the "data_connector_query", neither seem to be needed. (Just trying to see any potential issues here, I know I've had problems with the config files before).
In the meantime, like I mentioned in the previous thread. I realized that it was able to connect to my snowflake, and did a lot of table scans (including tables outside our my database and schema specified in the
sf_url
). Do you know why this is happening or there is a way to only scan my specified database and schema? I feel like maybe this could also be an issue, since there are tons of tables in the snowflake and it could exceed the limit?
Let me quote part of my previous reply:
I noticed in the doc you linked it says "SimpleSqlalchemyDatasource supports a number of configuration options to assist you with the introspection of your SQL database:" and it does not look like you're using a SimpleSqlalchemyDatasource. Not sure if that's the issue, but it may be part of it, and may be why it's doing the full scan still.
I have not used the SimpleSqlalchemyDatasource
before, but you may need to use that instead of Datasource
in the line you have now in your data source config:
"class_name": "Datasource",
from airflow-provider-great-expectations.
@zhangchi1 something I'm seeing in this latest set of configs is first in the datasource config, you have:
"table_name": '"MY_TABLE"',
with double quotes around the table name (and I don't think you need the table_name
param at all), then in the checkpoint conf you have "data_asset_name": 'MY_TABLE',
with no double quotes. I think this may be causing your table from the data_asset_name
to not be looked up correctly.
from airflow-provider-great-expectations.
@talagluck might be able to help here as well.
The data_asset_name
should be just the table name in this case. Which version of Great Expectations are you using? I see you're using a newer version of the provider. You can also try using the default checkpoint the provider builds for you and see if that works. An issue may be that you have a different connection in your Airflow Connections than in your GE datasource, this will be resolved if you let the operator build a connection for you.
I noticed in the doc you linked it says "SimpleSqlalchemyDatasource supports a number of configuration options to assist you with the introspection of your SQL database:" and it does not look like you're using a SimpleSqlalchemyDatasource
. Not sure if that's the issue, but it may be part of it, and may be why it's doing the full scan still.
Also in your CheckpointConfig
, in the batch_request
, I'd try removing the .lower()
in the data_asset_name and removing the "data_connector_query"
, neither seem to be needed. (Just trying to see any potential issues here, I know I've had problems with the config files before).
from airflow-provider-great-expectations.
Yeah, that documentation is fine for the operator, too. I just saw in the other doc you linked, it specifically showcased SimpleSqlAlchemyDatasource
, and that's why I'm also asking which version of Great Expectations proper you're running, too. You may have to upgrade that to use the other datasource. I'd also ask about that specific question of tables on the GX slack, you'll likely get a more detailed answer there.
from airflow-provider-great-expectations.
Hi @zhangchi1 - my mistake. It sounds like the issue is most likely not a lowercase table name issue.
I'm not sure what you mean by this:
I tried the above configs, but still got the keyerror. @talagluck I'm wondering if the issue could be great-expectations/great_expectations#6260 ? Since the great_expectations airflow operator hasn't upgraded with the latest great_expectations version?
What version of great_expectations are you running? More importantly, do you get the same error when running this outside of Airflow with the same configs?
from airflow-provider-great-expectations.
In your datasource config, I've never used this line before:
"included_tables": f"{schema}.test_sf_table".lower()
so it may not be needed/ may be creating an issue with the data_asset_name
.
Also, I'd double-check that your table exists, and that your role has access to that table!
from airflow-provider-great-expectations.
Hi @denimalpaca , thank you so much for your quick response.
- I specified
included_tables
because I only want to run some validation on my test_sf_table, and I read this doc:specifying included_tables will have the effect of including only the tables on this list, while excluding the rest
https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_configure_a_dataconnector_to_introspect_and_partition_tables_in_sql/#2-customize-the-introspection-configuration-to-fit-your-needs - I doubled check my snowflake account, and there is a table named "test_sf_table" under my database and schema.
In addition, I want to point out that The operator was able to scan the snowflake tables before throwing the KeyError. Iām also not sure why it scanned all tables, as I already set included_tables in my sf_datasource_config
?
[2022-11-21, 22:03:19 EST] {connection.py:274} INFO - Snowflake Connector for Python Version: 2.8.0, Python Version: 3.9.15, Platform: Linux-5.10.76-linuxkit-x86_64-with-glibc2.31
[2022-11-21, 22:03:19 EST] {connection.py:933} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2022-11-21, 22:03:19 EST] {connection.py:951} INFO - Setting use_openssl_only mode to False
[2022-11-21, 22:03:21 EST] {cursor.py:714} INFO - query: [select current_database(), current_schema();]
[2022-11-21, 22:03:21 EST] {cursor.py:738} INFO - query execution done
[2022-11-21, 22:03:21 EST] {cursor.py:854} INFO - Number of results in first chunk: 1
from airflow-provider-great-expectations.
In your datasource config, I've never used this line before:
"included_tables": f"{schema}.test_sf_table".lower()
so it may not be needed/ may be creating an issue with thedata_asset_name
.Also, I'd double-check that your table exists, and that your role has access to that table!
I just removed `included_tables', but still got the same error.
KeyError: 'data_asset_name ... is not recognized.'
I'm wondering what is the data_asset_name
, is it just a table name or it has to be {database}.{schema}.{table}
? Thank you ~
from airflow-provider-great-expectations.
thanks @denimalpaca , I'm trying it right now.
Also in your CheckpointConfig, in the batch_request, I'd try removing the .lower() in the data_asset_name and removing the "data_connector_query", neither seem to be needed. (Just trying to see any potential issues here, I know I've had problems with the config files before).
In the meantime, like I mentioned in the previous thread. I realized that it was able to connect to my snowflake, and did a lot of table scans (including tables outside our my database and schema specified in the sf_url
). Do you know why this is happening or there is a way to only scan my specified database and schema?
I feel like maybe this could also be an issue, since there are tons of tables in the snowflake and it could exceed the limit?
from airflow-provider-great-expectations.
hi @denimalpaca thanks a lot for your clarification.
I tried
sf_datasource_config = {
"class_name": "SimpleSqlalchemyDatasource",
...}
But got great_expectations.exceptions.exceptions.DatasourceError: Cannot initialize datasource
error. I was mainly following this doc to connect my snowflake database https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/database/snowflake.
I'm not sure if the documentation is not the same for the operator. Do you suggest a downgrade? I'm using airflow-provider-great-expectations==0.2.0
from airflow-provider-great-expectations.
hi @denimalpaca, thanks again. Just want to keep you updated that I tried ConfiguredAssetSqlDataConnector
based on this doc: https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/datasource_configuration/how_to_configure_a_sql_datasource/#7-configure-your-individual-data-connectors-splitting-sampling-etc
sf_datasource_config = {
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": sf_url,
},
"data_connectors": {
"default_configured_asset_data_connector_name": {
"class_name": "ConfiguredAssetSqlDataConnector",
"include_schema_name": True,
"schema_name": "schema",
"table_name": "table",
}
},
}
I removed other data connectors and was able to stop all of those scans. However, I still got the same keyerror. Sent a message in the GE Slack channel waiting for their response.
from airflow-provider-great-expectations.
Hi @zhangchi1 - there is a known bug around table names in Snowflake that were made explicitly lowercase, since Snowflake defaults to uppercase, while SqlAlchemy defaults to lowercase, and Snowflake makes it somewhat difficult to create and access lowercase tables (you need to wrap them in double quotes).
When you perform simple SELECT statements in your Snowflake environment from the table, do you need to wrap the table name in double quotes, e.g. MY_DB.MY_SCHEMA."test_sf_table"
?
from airflow-provider-great-expectations.
hi @talagluck , I tried in my snowflake console UI either way works (where my table is created with uppercase). But I usually perform select query: SELECT * FROM "MY_DB"."MY_SCHEMA"."MY_TABLE"
.
Are you saying I should do
sf_datasource_config = {
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": sf_url,
},
"data_connectors": {
"default_configured_asset_data_connector_name": {
"class_name": "ConfiguredAssetSqlDataConnector",
"include_schema_name": True,
"schema_name": '"MY_SCHEMA"',
"table_name": '"MY_TABLE"',
}
},
}
from airflow-provider-great-expectations.
and in the checkpoint config I should do:
snowflake_checkpoint_config = CheckpointConfig(
**{
"name": "test_sf_checkpoint",
"config_version": 1.0,
"template_name": None,
"module_name": "great_expectations.checkpoint",
"class_name": "Checkpoint",
"run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
"expectation_suite_name": "sf_test.demo",
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction", "site_names": []},
},
],
"evaluation_parameters": {},
"runtime_configuration": {},
"validations": [
{
"batch_request": {
"datasource_name": "my_snowflake_datasource",
"data_connector_name": "default_configured_asset_data_connector_name",
"data_asset_name": '"MY_TABLE"'),
"data_connector_query": {"index": -1},
},
}
],
"profilers": [],
"ge_cloud_id": None,
"expectation_suite_ge_cloud_id": None,
}
)
I tried the above configs, but still got the keyerror. @talagluck I'm wondering if the issue could be great-expectations/great_expectations#6260 ? Since the great_expectations airflow operator hasn't upgraded with the latest great_expectations version? Thanks,
from airflow-provider-great-expectations.
hi @talagluck we are using airflow-provider-great-expectations 0.2.1 and I believe it uses great-expectations>=0.13.14. We are also testing the same configs via pure GX outside of Airflow. will keep you posted. Thanks a lot for your continuous support
from airflow-provider-great-expectations.
Hi @talagluck, we are getting the same error with great-expectations Version: 0.15.34
.
Here are the configs for you as reference, please feel free to let me know if you find any issues. Thank you so much ~
sf_datasource = DatasourceConfig(class_name="Datasource",
execution_engine={
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": sf_url,
},
data_connectors={
"default_configured_asset_data_connector_name": {
"class_name": "ConfiguredAssetSqlDataConnector",
"include_schema_name": True,
"schema_name": "MY_SCHEMA",
"table_name": '"MY_TABLE"',
},
})
sf_data_context = DataContextConfig(config_version=3.0,
datasources={"my_snowflake_datasource": sf_datasource},
stores=
{
"expectations_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": os.path.join(ge_root_dir, "expectations"),
},
},
"validations_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": os.path.join(
ge_root_dir, "uncommitted", "validations"
),
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
"checkpoint_store": {
"class_name": "CheckpointStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"suppress_store_backend_id": True,
"base_directory": os.path.join(ge_root_dir, "checkpoints"),
},
},
},
expectations_store_name="expectations_store",
validations_store_name="validations_store",
evaluation_parameter_store_name="evaluation_parameter_store",
checkpoint_store_name="checkpoint_store",
data_docs_sites=
{
"local_site": {
"class_name": "SiteBuilder",
"show_how_to_buttons": True,
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": os.path.join(
ge_root_dir, "uncommitted", "data_docs", "local_site"
),
},
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
}
},
anonymous_usage_statistics=
{
"data_context_id": "abcdabcd-1111-2222-3333-abcdabcdabcd",
"enabled": True,
},
notebooks=None,
concurrency={"enabled": False}
)
context = BaseDataContext(project_config=sf_data_context)
sf_checkpoint = Checkpoint(data_context=context, **{
"name": "test_sf_checkpoint",
"config_version": 1.0,
"template_name": None,
"run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
"expectation_suite_name": f"sf_test.demo",
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction", "site_names": []},
},
],
"evaluation_parameters": {},
"runtime_configuration": {},
"validations": [
{
"batch_request": {
"datasource_name": "my_snowflake_datasource",
"data_connector_name": "default_configured_asset_data_connector_name",
"data_asset_name": 'MY_TABLE',
# "data_connector_query": {"index": -1},
},
}
]
})
results = sf_checkpoint.run()
from airflow-provider-great-expectations.
Hi @zhangchi1 , was this issue ever resolved?
from airflow-provider-great-expectations.
hi @denimalpaca, thanks a lot for reminding me. Yes, I believe adding "assets" field in the "data_connectors" field under "datasource_config" would resolve the issue. I think we can close this issue.
from airflow-provider-great-expectations.
Related Issues (20)
- Snowflake Region should be optional
- Add Trino support HOT 1
- GreatExpectationsOperator is overriding database name with schema HOT 2
- Feature Request: pass parameters from Airflow to GE Checkpoint
- Feature Request: run EXPERIMENTAL expectation (from great_expectations_experimental library) from Airflow? HOT 1
- Parsing data_asset_name should account for fully qualified table name.
- How to pass create_temp_table: False to SqlAlchemy? HOT 3
- Test failure due to `apache-airflow-providers-snowflake` 4.3.1
- Passing value_set through Evaluation Parameters crashes if list size is more than 100
- Cannot run validation on BigQuery HOT 2
- Connection to Athena via `conn_id`
- Case-sensitive code for ODBC Driver extras
- Feature request: Allow passage of credentials for Stores via the GXO
- Parallel GreatExpectationsOperator tasks corrupt great_expectations.yml HOT 3
- Parallel Execution of GX in Airflow randomly fails. In serial execution always passes
- Build data_context object in `__init__()` and not in `execute` method
- Can't use `checkpoint_kwargs` with `conn_id`
- add support for Athena (aws_default) airflow connection HOT 7
- GreatExpectationsOperator ignoring schema from connection HOT 1
- Add schema to template fields
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
š Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ššš
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ā¤ļø Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from airflow-provider-great-expectations.