Coder Social home page Coder Social logo

tomasfarias / airflow-dbt-python Goto Github PK

View Code? Open in Web Editor NEW
161.0 5.0 32.0 4.67 MB

A collection of Airflow operators, hooks, and utilities to elevate dbt to a first-class citizen of Airflow.

Home Page: https://airflow-dbt-python.readthedocs.io

License: MIT License

Python 100.00%
airflow python3 dbt python airflow-operators analytics data-engineering airflow-hook

airflow-dbt-python's People

Contributors

adamantike avatar blackbass64 avatar dpinedaj avatar marcusintrohive avatar millin avatar tomasfarias avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

airflow-dbt-python's Issues

ModuleNotFoundError: No module named 'hologram'

Airflow version : 2.5.2
Airflow runs in EKS
Installed dbt version

USER airflow
RUN python3 -m pip install --user \
    -c "https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-source-providers-${PYTHON_RUNTIME_VERSION}.txt" \
    apache-airflow-providers-amazon \
    apache-airflow-providers-ftp \
    apache-airflow-providers-http \
    apache-airflow-providers-imap \
    apache-airflow-providers-sqlite \
    apache-airflow-providers-snowflake \
    apache-airflow-providers-mysql \
    apache-airflow-providers-microsoft-mssql \
    apache-airflow-providers-odbc \
    apache-airflow-providers-mongo \
    apache-airflow-providers-opsgenie \
    apache-airflow-providers-slack \
    apache-airflow-providers-postgres \
    apache-airflow-providers-common-sql \
    apache-airflow-providers-ssh \
    requests-ntlm \
    rpy2 
    
RUN python3 -m pip install --user \
    dbt-core==1.4.5 \
    --no-deps \
    dbt-snowflake==1.4.1 \
    airflow-dbt-python[snowflake]
clone_repo = BashOperator(
       task_id='clone_repo',
       bash_command='git clone https://$gitlabUser:[email protected]/abc.git /opt/airflow/dbt/',
   )

   dbt_run = DbtRunOperator(
       task_id="dbt_run",
       project_dir="/opt/airflow/dbt/",
       profiles_dir="~/.dbt/",
       select=["tag:staging"],
       exclude=["tag:deprecated"],
       target="dev",
       profile="profiles.yml",
       full_refresh=False,
       threads=100,
       fail_fast=True,
   )

Error

 Reading local file: /opt/airflow/logs/dag_id=dbt_poc/run_id=manual__2023-03-28T20:22:13.528630+00:00/task_id=dbt_run/attempt=1.log
[2023-03-28, 20:22:33 UTC] {taskinstance.py:1084} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dbt_poc.dbt_run manual__2023-03-28T20:22:13.528630+00:00 [queued]>
[2023-03-28, 20:22:33 UTC] {taskinstance.py:1084} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dbt_poc.dbt_run manual__2023-03-28T20:22:13.528630+00:00 [queued]>
[2023-03-28, 20:22:33 UTC] {taskinstance.py:1282} INFO - 
--------------------------------------------------------------------------------
[2023-03-28, 20:22:33 UTC] {taskinstance.py:1283} INFO - Starting attempt 1 of 1
[2023-03-28, 20:22:33 UTC] {taskinstance.py:1284} INFO - 
--------------------------------------------------------------------------------
[2023-03-28, 20:22:33 UTC] {taskinstance.py:1303} INFO - Executing <Task(DbtRunOperator): dbt_run> on 2023-03-28 20:22:13.528630+00:00
[2023-03-28, 20:22:33 UTC] {standard_task_runner.py:55} INFO - Started process 27 to run task
[2023-03-28, 20:22:34 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_poc', 'dbt_run', 'manual__2023-03-28T20:22:13.528630+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/dbt/dbt_poc.py', '--cfg-path', '/tmp/tmpwucq60ra']
[2023-03-28, 20:22:34 UTC] {standard_task_runner.py:83} INFO - Job 5: Subtask dbt_run
[2023-03-28, 20:22:34 UTC] {task_command.py:388} INFO - Running <TaskInstance: dbt_poc.dbt_run manual__2023-03-28T20:22:13.528630+00:00 [running]> on host dbt-poc-dbt-run-275230a903aa4bb491974e4f01edbc13
[2023-03-28, 20:22:34 UTC] {configuration.py:1141} WARNING - Ignoring unknown env var 'AIRFLOW__KUBERNETES_SECRETS__gitlabToken'
[2023-03-28, 20:22:34 UTC] {configuration.py:1141} WARNING - Ignoring unknown env var 'AIRFLOW__KUBERNETES_SECRETS__gitlabUser'
[2023-03-28, 20:22:34 UTC] {pod_generator.py:424} WARNING - Model file  does not exist
[2023-03-28, 20:22:34 UTC] {taskinstance.py:1509} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=dbt_poc
AIRFLOW_CTX_TASK_ID=dbt_run
AIRFLOW_CTX_EXECUTION_DATE=2023-03-28T20:22:13.528630+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-28T20:22:13.528630+00:00
[2023-03-28, 20:22:34 UTC] {dbt.py:165} INFO - Running dbt task: run
[2023-03-28, 20:22:34 UTC] {dbt.py:176} ERROR - An error has ocurred while executing dbt run
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/operators/dbt.py", line 170, in execute
    result = self.dbt_hook.run_dbt_task(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/dbt.py", line 199, in run_dbt_task
    from dbt.adapters.factory import register_adapter
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/adapters/factory.py", line 8, in <module>
    from dbt.adapters.base.plugin import AdapterPlugin
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/adapters/base/__init__.py", line 4, in <module>
    from dbt.contracts.connection import Credentials  # noqa
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/contracts/connection.py", line 15, in <module>
    from dbt.exceptions import DbtInternalError
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/exceptions.py", line 6, in <module>
    from dbt.dataclass_schema import ValidationError
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/dataclass_schema.py", line 12, in <module>
    from hologram import JsonSchemaMixin, FieldEncoder, ValidationError
ModuleNotFoundError: No module named 'hologram'
[2023-03-28, 20:22:34 UTC] {taskinstance.py:1775} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/operators/dbt.py", line 170, in execute
    result = self.dbt_hook.run_dbt_task(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/dbt.py", line 199, in run_dbt_task
    from dbt.adapters.factory import register_adapter
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/adapters/factory.py", line 8, in <module>
    from dbt.adapters.base.plugin import AdapterPlugin
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/adapters/base/__init__.py", line 4, in <module>
    from dbt.contracts.connection import Credentials  # noqa
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/contracts/connection.py", line 15, in <module>
    from dbt.exceptions import DbtInternalError
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/exceptions.py", line 6, in <module>
    from dbt.dataclass_schema import ValidationError
  File "/home/airflow/.local/lib/python3.10/site-packages/dbt/dataclass_schema.py", line 12, in <module>
    from hologram import JsonSchemaMixin, FieldEncoder, ValidationError
ModuleNotFoundError: No module named 'hologram'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/operators/dbt.py", line 179, in execute
    raise AirflowException(
airflow.exceptions.AirflowException: An error has occurred while executing dbt run
[2023-03-28, 20:22:34 UTC] {taskinstance.py:1321} INFO - Marking task as FAILED. dag_id=dbt_poc, task_id=dbt_run, execution_date=20230328T202213, start_date=20230328T202233, end_date=20230328T202234
[2023-03-28, 20:22:34 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 5 for task dbt_run (An error has occurred while executing dbt run; 27)
[2023-03-28, 20:22:34 UTC] {local_task_job.py:212} INFO - Task exited with return code 1
[2023-03-28, 20:22:34 UTC] {taskinstance.py:2585} INFO - 0 downstream tasks scheduled from follow-on schedule check

Debug logs are default - cannot be turned off

Running on MWAA 2.2.2
I upgraded to 0.15.2 and airflow-redshift==1.2.1. Prior to the upgrade, logs appeared like this:

[2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | Concurrency: 4 threads (target='prod')
[2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 |
[2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 1 of 15 START table model ***.................... [RUN]
[2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 2 of 15 START table model ****.......................... [RUN]
[2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 3 of 15 START table model ***................. [RUN]

After the upgrade,

[2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - �[0m05:48:58.678976 [debug] [MainThread]: Partial parsing not enabled
[2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - �[0m05:48:58.691536 [debug] [MainThread]: Parsing macros/generate_schema_name.sql
[2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - �[0m05:48:58.692644 [debug] [MainThread]: Parsing macros/adapters.sql
[2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - �[0m05:48:58.710620 [debug] [MainThread]: Parsing macros/catalog.sql
[2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - �[0m05:48:58.718226 [debug] [MainThread]: Parsing macros/relations.sql
[2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - �[0m05:48:58.718708 [debug] [MainThread]: Parsing macros/adapters/apply_grants.sql
[2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - �[0m05:48:58.719046 [debug] [MainThread]: Parsing macros/utils/datediff.sql
[2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - �[0m05:48:58.719490 [debug] [MainThread]: Parsing macros/utils/listagg.sql
[2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - �[0m05:48:58.721693 [debug] [MainThread]: Parsing macros/utils/split_part.sql

Not sure if this is due to dbt or airflow-dbt-python
Since the filename is functions.py (previously logging_mixin.py), I assume that this is due to the dbt upgrade. But when I run dbt from the command line, I do not see these debug logs, so I wanted to check.

I've tried

config:
    debug: False

in profiles.yml but this doesn't seem to help

Extras installation is broken since v1.0.0

Description

With the changes introduced in v1.0.0 (specifically, in commit 06e6025), specifying this package's extras does not install the optional dependencies.

It gets fixed when adding these lines back to the [tool.poetry.dependencies] section in pyproject.toml:

dbt-postgres = { version = ">=1.0.0", optional = true }
dbt-redshift = { version = ">=1.0.0", optional = true }
dbt-snowflake = { version = ">=1.0.0", optional = true }
dbt-bigquery = { version = ">=1.0.0", optional = true }
dbt-spark = { version = ">=1.0.0", optional = true }

This makes sense according to Poetry documentation:

The dependencies specified for each extra must already be defined as project dependencies.
Dependencies listed in dependency groups cannot be specified as extras.

However, I haven't created a PR for this, because I see that commit also created a Poetry adapters group, which could get broken with this change. Related Poetry issue: python-poetry/poetry#4891 (comment)

Steps to reproduce

  1. Create a new virtualenv.
  2. Run pip install 'airflow-dbt-python[snowflake] == 1.0.0'.
  3. Check for the presence of dbt-snowflake, by running pip freeze | grep dbt-snowflake.

Additional context

Installing version 0.15.3 correctly installs provided extras:

$ pip install 'airflow-dbt-python[snowflake] == 0.15.3'
(...)
Successfully installed Babel-2.12.1 Jinja2-3.1.2 MarkupSafe-2.1.2 SecretStorage-3.3.3 agate-1.7.0 airflow-dbt-python-0.15.3 asn1crypto-1.5.1 attrs-22.2.0 betterproto-1.2.5 certifi-2022.12.7 cffi-1.15.1 charset-normalizer-2.1.1 click-8.1.3 colorama-0.4.6 cryptography-39.0.2 dbt-core-1.4.5 dbt-extractor-0.4.1 dbt-snowflake-1.4.2 filelock-3.10.0 future-0.18.3 grpclib-0.4.3 h2-4.1.0 hologram-0.0.15 hpack-4.0.0 hyperframe-6.0.1 idna-3.4 importlib-metadata-6.1.0 isodate-0.6.1 jaraco.classes-3.2.3 jeepney-0.8.0 jsonschema-3.2.0 keyring-23.13.1 leather-0.3.4 logbook-1.5.3 mashumaro-3.3.1 minimal-snowplow-tracker-0.0.2 more-itertools-9.1.0 msgpack-1.0.5 multidict-6.0.4 networkx-2.8.8 oscrypto-1.3.0 packaging-23.0 parsedatetime-2.4 pathspec-0.10.3 pyOpenSSL-23.0.0 pycparser-2.21 pycryptodomex-3.17 pyjwt-2.6.0 pyrsistent-0.19.3 python-dateutil-2.8.2 python-slugify-8.0.1 pytimeparse-1.1.8 pytz-2022.7.1 pyyaml-6.0 requests-2.28.2 six-1.16.0 snowflake-connector-python-3.0.1 sqlparse-0.4.3 stringcase-1.2.0 text-unidecode-1.3 typing-extensions-4.5.0 urllib3-1.26.15 werkzeug-2.2.3 zipp-3.15.0

$ pip freeze | grep dbt-snowflake
dbt-snowflake==1.4.2

For version 1.0.0, that's no longer the case:

$ pip install 'airflow-dbt-python[snowflake] == 1.0.0'
(...)
Successfully installed Babel-2.12.1 Jinja2-3.1.2 MarkupSafe-2.1.2 agate-1.7.0 airflow-dbt-python-1.0.0 attrs-22.2.0 betterproto-1.2.5 certifi-2022.12.7 cffi-1.15.1 charset-normalizer-3.1.0 click-8.1.3 colorama-0.4.6 dbt-core-1.4.5 dbt-extractor-0.4.1 future-0.18.3 grpclib-0.4.3 h2-4.1.0 hologram-0.0.15 hpack-4.0.0 hyperframe-6.0.1 idna-3.4 isodate-0.6.1 jsonschema-3.2.0 leather-0.3.4 logbook-1.5.3 mashumaro-3.3.1 minimal-snowplow-tracker-0.0.2 msgpack-1.0.5 multidict-6.0.4 networkx-2.8.8 packaging-23.0 parsedatetime-2.4 pathspec-0.10.3 pycparser-2.21 pyrsistent-0.19.3 python-dateutil-2.8.2 python-slugify-8.0.1 pytimeparse-1.1.8 pytz-2022.7.1 pyyaml-6.0 requests-2.28.2 six-1.16.0 sqlparse-0.4.3 stringcase-1.2.0 text-unidecode-1.3 typing-extensions-4.5.0 urllib3-1.26.15 werkzeug-2.2.3

$ pip freeze | grep dbt-snowflake
(no output)

[Feature] Use Airflow connections instead of a profiles.yml file

Each target in the dbt profiles.yml file essentially works the same as an Airflow connection, as it specifies the connection parameters for the target.

If we could just pull the parameters from a connection instead of relying on a profiles.yml file, then this would remove the need to maintain a profiles.yml file at all, and it opens the door for any backend that can be used with Airflow connections.

The naive way to do this is pulling the parameters from a connection and creating our own profiles.yml at runtime, but I'd like to explore if we can save the file creation and simply provide the parameters to the appropriate dbt methods.

Add support for dbt version 0.21.0

The release of dbt version 0.21.0 comes with a lot of goodies, in particular, the following ones should be supported by us:

  • dbt source snapshot-freshness renamed to dbt source freshness.
  • New artifact schemas.
  • New dbt build command.
  • --output-keys parameter for dbt ls.
  • Switch to --select instead of --models in all commands. We should keep the models parameter for backwards compatibility though (maybe joining the two?).

Connection failures cause an UnboundLocalError

When timing out attempting to connect to a database, dbt log a DatabaseError, like so:

Database Error
  connection to server at "<server>" (<ip>), port <port> failed: timeout expired

dbt must be doing some under the hood exception handling as this is causing the results variable here: https://github.com/tomasfarias/airflow-dbt-python/blob/master/airflow_dbt_python/hooks/dbt.py#L580 to be unbound.

We should account for the possibility of no results/results being unbound.

Add support for Python 3.10

Now that dbt-core 1.1 is out, and we completed #54, we should try out Python 3.10. We may hold this off until dbt-core drops support for Python 3.10 due to the added maintenance load of supporting 4 Python versions.

Implement integration testing pipeline for example DAGs

Issue #51 showed us that we are not covering the @dag syntax or the TaskFlow API in our testing pipeline.

Perhaps this could be a good excuse to design an integration testing pipeline that runs a few complete DAGs in an Airflow container. It would be great if we could validate our examples!

Check if git tag matches Python package tag before releasing

The release pipeline is mostly automated. The only manual steps are:

  • Bumping the pyproject.toml version using poetry version
  • Bumping the airflow_dbt_python.__version__ manually.
  • Tagging latest commit.
  • Pushing changes + tag.

However, I sometimes slip up, and we should have checks in place to prevent malformed releases.

In particular, this issue proposes adding a step to the Tagged release GitHub Action to check that the git tag matches airflow_dbt_python.__version__.

Push to S3: malformed URL

Hi there,

I'm testing out airflow-dbt-python==0.12.0 on AWS MWAA. It's working brilliantly with push_dbt_project=False, but fails when trying to push things back to S3 using push_dbt_project=True. I definitely have an IAM policy associated with MWAA that gives the correct PutObject permission for the bucket.

Looking at my log output, I think there's something wrong with the arguments being passed into load_file_handle_replace_error.

The log message written by the operator shows the correct target URL:

[2022-02-21, 03:24:01 UTC] {{dbt.py:258}} INFO - Pushing dbt project back to S3: s3://mwaa-test-bucket/dbt/project/dbt.zip

But when the boto3 S3UploadFailedError occurs later, the error output shows a malformed URL:

boto3.exceptions.S3UploadFailedError: Failed to upload /tmp/airflowtmptr4_9_un/dbt_project.zip to mwaa-test-bucket/s3://mwaa-test-bucket/dbt/project/dbt.zip: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied

I had a quick look at the S3 hook code, and I think it may be to do with this bit:

s3_key = f"s3://{bucket_name}/{key}{ _file.relative_to(project_dir)}"
self.load_file_handle_replace_error(
filename=_file,
key=s3_key,
bucket_name=bucket_name,
replace=replace,
)

As I understand it, what's happening here is that the full S3 URL is being passed in as the object key, where I think it should just be the object key itself (assuming that the underlying Airflow S3 hook code should be able to construct the final S3 URL using the supplied bucket name and key).

This probably explains why the logs have a malformed URL - it looks like it's concatenating the bucket name (mwaa-test-bucket/) and key (s3://mwaa-test-bucket/dbt/project/dbt.zip).

dbt_run_operation operator fails with no arg

I have a macro that I can trigger from the cli with no arguments however it fails when triggered from airflow via the DbtRunOperation operator. Is this expected behavior?

The error I get in the airflow logs is ERROR - 20:54:11 Encountered an error while running operation: a string or stream input is required

Not all default arguments are being passed to the DbtBuildOperator

Hey!

I noticed that the on_failure_callback from the default argument is not being passed to the DBTBuildOperator as wall as the sla argument.

I can see from the "Instance Details" these arguments are being passed to my other tasks but not to the airflow_dbt_python operators I bring in. Will try and come up with a PR for the fix, but if you have any guidance on where to start that would be great!

Thanks

[Feature] Support pulling dbt artifacts from XCOM

Currently, airflow-dbt-python supports pushing artifacts to XCOM. However, we do not offer pulling artifacts from XCOM. This would be particularly useful for stateful runs, like when using the source_status:fresher+ selector method that came with dbt 1.1.

Error downloading dbt project files from s3

When passing an S3 URL to project_dir parameter, DbtS3Hook tries to download the root folder (prefix) as the temp dir throwing an exception.

With project_dir="s3://MYBUCKET.com/dbt"

...
[2021-12-14 20:15:22,510] {{dbt.py:259}} INFO - Fetching dbt project from S3: s3://MYBUCKET.com/dbt/
[2021-12-14 20:15:22,511] {{s3.py:82}} INFO - Downloading dbt project files from: s3://MYBUCKET.com/dbt/
...
[2021-12-14 20:15:24,950] {{s3.py:58}} INFO - Saving s3.Object(bucket_name='MYBUCKET.com', key='dbt/') file to: /tmp/airflowtmpimiwzxx7
[2021-12-14 20:15:24,952] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 147, in execute
    with self.dbt_directory() as dbt_dir:  # type: str
  File "/usr/lib64/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 234, in dbt_directory
    self.prepare_directory(tmp_dir)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 262, in prepare_directory
    tmp_dir,
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 112, in get_dbt_project
    bucket_name, s3_object_keys, local_project_dir, key_prefix
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 131, in download_many_s3_keys
    self.download_one_s3_object(local_project_file, s3_object)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 60, in download_one_s3_object
    with open(target, "wb+") as f:
IsADirectoryError: [Errno 21] Is a directory: '/tmp/airflowtmpimiwzxx7'

I wasn't able to figure out why airflow_dbt_python/hooks/s3.py, line 98:

s3_object_keys = self.list_keys(bucket_name=bucket_name, prefix=f"{key_prefix}")

Returns the prefix folder itself (dbt/) as one of the keys.

Then airflow_dbt_python/hooks/s3.py, line 55, fails to open the file as it is actually a folder:

with open(target, "wb+") as f:
    s3_object.download_fileobj(f)

By adding the following check after line 112, I was able to workaround the error.

if s3_object_key == prefix:
    continue

No dbt_project.yml found when using S3 backend in MWAA

We are trying to setup airflow-dbt-python in MWAA (airflow 2.2.2) and getting: no dbt_project.yml found when the yml file is definitely in the s3 bucket.

Our dag, as an example:

`
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago

from airflow import DAG

from airflow_dbt_python.operators.dbt import DbtDocsGenerateOperator

with DAG(
dag_id="dbt_docs",
schedule_interval="@daily",
start_date=days_ago(1),
catchup=False,
dagrun_timeout=timedelta(minutes=60),
) as dag:

dbt_docs = DbtDocsGenerateOperator(
    task_id="dbt_docs",
    project_dir="s3://xxx-bi-dbt/",
    profiles_dir="s3://xxx-bi-dbt/profiles/",
    profile="xxx"
)

dbt_docs

`

The airflow logs:

[2022-07-03, 08:06:05 UTC] {{dbt.py:271}} INFO - Initializing temporary directory: /tmp/airflowtmppgtpg7w3 [2022-07-03, 08:06:05 UTC] {{base.py:43}} INFO - Pulling dbt profiles file from: s3://xxx-bi-dbt/profiles/ [2022-07-03, 08:06:05 UTC] {{base_aws.py:401}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-07-03, 08:06:05 UTC] {{base_aws.py:190}} INFO - No credentials retrieved from Connection [2022-07-03, 08:06:05 UTC] {{base_aws.py:93}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-07-03, 08:06:05 UTC] {{base_aws.py:168}} INFO - role_arn is None [2022-07-03, 08:06:05 UTC] {{s3.py:202}} INFO - Downloading S3Object s3.Object(bucket_name='xxx-bi-dbt', key='profiles/profiles.yml') to: /tmp/airflowtmppgtpg7w3/profiles.yml [2022-07-03, 08:06:05 UTC] {{base.py:67}} INFO - Pulling dbt project files from: s3://xxx-bi-dbt/ [2022-07-03, 08:06:05 UTC] {{base_aws.py:401}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-07-03, 08:06:05 UTC] {{base_aws.py:190}} INFO - No credentials retrieved from Connection [2022-07-03, 08:06:05 UTC] {{base_aws.py:93}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-07-03, 08:06:05 UTC] {{base_aws.py:168}} INFO - role_arn is None [2022-07-03, 08:06:05 UTC] {{dbt.py:177}} INFO - Running dbt configuration: GenerateTaskConfig(cls=<class 'dbt.task.generate.GenerateTask'>, project_dir='/tmp/airflowtmppgtpg7w3/', profiles_dir='/tmp/airflowtmppgtpg7w3/', profile='xxx', target=None, compiled_target=None, cache_selected_only=None, fail_fast=None, single_threaded=None, threads=None, use_experimental_parser=None, vars='{}\n', warn_error=None, log_format=None, log_cache_events=False, record_timing_info=None, debug=None, quiet=None, no_print=None, defer=None, partial_parse=False, use_colors=None, static_parser=None, version_check=None, send_anonymous_usage_stats=None, write_json=None, exclude=None, select=None, selector_name=None, state=None, models=None, compile=True, which='generate') [2022-07-03, 08:06:05 UTC] {{dbt.py:182}} ERROR - There was an error executing dbt Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 180, in execute success, results = self.dbt_hook.run_dbt_task(config) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 744, in run_dbt_task task, runtime_config = config.create_dbt_task(extra_target) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 279, in create_dbt_task runtime_config = self.create_runtime_config(extra_targets) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 307, in create_runtime_config project, profile = self.create_dbt_project_and_profile(extra_targets) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 329, in create_dbt_project_and_profile profile = self.create_dbt_profile(extra_targets) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 408, in create_dbt_profile self.profile_name, {} File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 350, in profile_name project_profile_name = self.partial_project.render_profile_name( File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 376, in partial_project project_root, verify_version=version_check File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/project.py", line 634, in partial_load verify_version=verify_version, File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/project.py", line 475, in from_project_root project_dict = _raw_project_from(project_root) File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/project.py", line 160, in _raw_project_from "no dbt_project.yml found at expected path {}".format(project_yaml_filepath) dbt.exceptions.DbtProjectError: Runtime Error no dbt_project.yml found at expected path /tmp/airflowtmppgtpg7w3/dbt_project.yml

Could you give us a hint here ?

Override dbt logging for better memory usage

In order to copy the dbt logs over to the Airflow logger we use a bit of a hack: we force dbt to log to a temporary file and then write the contents of that file to the Airflow logger. This is similar to what Airflow's BashOperator does as it re-reads from the subprocess module: https://github.com/apache/airflow/blob/v1-10-stable/airflow/operators/bash_operator.py#L155

Unfortunately, since dbt logs quite a bit, this may lead the worker to hit its memory limit as the log file grows waiting to be read at the end. We should properly override the dbt logger with Airflow's logger so that we can directly log as we go, instead of having to rely on a file.

Support for vars as a template_field

Hey! Great package so far. Wondering about supporting vars as a templated_field. Looking to be able to use the dag_conf to pass different vars when running various dbt commands.

Key pair authentication for long running dbt builds eventually is no longer able to authenticate

Hey! As our dbt builds have grown over time we are noticing occasionally they are failing to connect using key pair authentication.

The error we are seeing from DBT is. Something to note is that we are successfully able to run the bulk of our dbt build before this happens so the key pair file does exist on the airflow worker and we are successfully making the connection.

[2023-03-16 18:16:15,149] {{functions.py:236}} ERROR - [0m18:16:15  [33m('Could not deserialize key data. The data may be in an incorrect format, it may be encrypted with an unsupported algorithm, or it may be an unsupported key type (e.g. EC curves with explicit parameters).', [_OpenSSLErrorWithText(code=151584876, lib=9, reason=108, reason_text=b'error:0909006C:PEM routines:get_name:no start line')])[0m

Recently we added connect_retries: 3 to our dbt_profile hoping that would resolve the issue but unfortunately did not.

We are using an older version of airflow-dbt-python and airflow which we plan to upgrade over the next couple weeks.

  • airflow-dbt-python[snowflake]==0.14.5
  • airflow 2.0.2

I see in #79 the support to use airflow connections was added. Wondering if you have any insight of how the connection would be created using a keypair authentication.

Currently we are setting up our private key using an airflow plugin.

We are running Airflow using MWAA.

The conn_id `fs_default` isn't defined

Airflow version : 2.5.2
Airflow runs in EKS
Installed dbt version

dbt-core==1.4.5 
dbt-snowflake==1.4.1 
airflow-dbt-python[snowflake]

profiles.yml

abc_metrics:
  outputs:
    dev:
      account: abc
      database: DEV
      role: DBT-ROLE
      schema:ABC_METRICS
      threads: 1
      type: snowflake
      user: <first>.<last>@ABC.COM
      warehouse: WH_M
  target: dev

clone_repo = BashOperator(
       task_id='clone_repo',
       bash_command='git clone https://$gitlabUser:[email protected]/abc.git /opt/airflow/dbt/',
   )

   dbt_run = DbtRunOperator(
       task_id="dbt_run",
       project_dir="/opt/airflow/dbt/models",
       profiles_dir="~/.dbt/",
       select=["tag:staging"],
       exclude=["tag:deprecated"],
       target="dev",
       profile="profiles.yml",
       full_refresh=False,
       threads=100,
       fail_fast=True,
   )
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-29T12:39:22.474515+00:00
[2023-03-29, 12:39:43 UTC] {dbt.py:165} INFO - Running dbt task: run
[2023-03-29, 12:39:46 UTC] {dbt.py:319} INFO - Initializing temporary directory: /tmp/airflow_tmpne65x0vs
[2023-03-29, 12:39:46 UTC] {dbt.py:176} ERROR - An error has ocurred while executing dbt run
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/dbt.py", line 148, in get_remote
    return self.remotes[(scheme, conn_id)]
KeyError: ('', None)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/dbt.py", line 322, in dbt_directory
    project_dir, profiles_dir = self.prepare_directory(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/dbt.py", line 367, in prepare_directory
    project_dir_path = self.download_dbt_project(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/dbt.py", line 180, in download_dbt_project
    remote = self.get_remote(scheme, self.project_conn_id)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/dbt.py", line 150, in get_remote
    remote = get_remote(scheme, conn_id)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/remote.py", line 165, in get_remote
    remote = remote_cls()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/localfs.py", line 38, in __init__
    super().__init__(fs_conn_id)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/hooks/filesystem.py", line 38, in __init__
    conn = self.get_connection(conn_id)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/hooks/base.py", line 72, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/connection.py", line 435, in get_connection_from_secrets
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
airflow.exceptions.AirflowNotFoundException: The conn_id `fs_default` isn't defined

Also tried with airflow_conn_id , same error

Latest version is incompatible with dbt-core 1.5.0

from dbt.config.runtime import UnsetProfileConfig

ImportError: cannot import name 'UnsetProfileConfig' from 'dbt.config.runtime' (/usr/local/airflow/.local/lib/python3.10/site-packages/dbt/config/runtime.py)

[Feature] Define separate remote for dbt artifact upload

Hi,

Thanks for this project it looks great and I am looking to switch over to using it. One thing on the docs in order to keep the speed on downloads I will probably zip my project dir. At the moment however, I do use s3 for my docs but I would like to use a different bucket to the one used for pulling airflow and dbt resources. Is there an override for the command below to change the upload bucket?

Thanks

    dbt_docs = DbtDocsGenerateOperator(
        task_id="dbt_docs",
        project_dir="s3://my-bucket/dbt/project/key/prefix/",
        profiles_dir="s3://my-bucket/dbt/profiles/key/prefix/",
    )

dbt not writing log to file

It seems that the dbt.log file is not being written inside the temp directory generated by the run.

Is there any extra config that needs to be done?

Thanks!

Implementation in MWAA Issue

Hi Tomas, thank you very much for posting this solution.

I have been trying to implement this in an MWAA Airflow instance without luck. Here is a sample of the log that I am getting.

AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=example_dbt_operator AIRFLOW_CTX_TASK_ID=dbt_test AIRFLOW_CTX_EXECUTION_DATE=2021-06-01T20:56:36.080170+00:00 AIRFLOW_CTX_DAG_RUN_ID=manual__2021-06-01T20:56:36.080170+00:00 [2021-06-01 20:56:40,658] {{taskinstance.py:1482}} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks result = self._execute_task(context, task_copy) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task result = task_copy.execute(context=context) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 68, in execute args.extend(self.args_list()) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 79, in args_list value = getattr(self, arg) AttributeError: project_dir

  • Airflow instance is the latest provided by AWS
  • It has been tested locally using the docker image provided by AWS
  • Code has been copied and pasted to DAG from the example posted.

[Feature] Update env vars rendered in profiles.yml

Wondering if airflow-dbt-python supports env variables in profiles.yml? If yes, how can I pass it to dag? If no, is there any workaround until the functionality becomes available?

<profile_name>:
  outputs:
    <source>:
      database: "{{ env_var('DBT_ENV_SECRET_DATABASE') }}"
      password: "{{ env_var('DBT_ENV_SECRET_PASSWORD') }}"
      schema: "{{ env_var('DBT_ENV_SECRET_SCHEMA') }}"
      threads: "{{ env_var('DBT_THREADS') }}"
      type: <type>
      user: "{{ env_var('USER_NAME') }}_{{ env_var('ENV_NAME') }}"
  target: <source>
DBT_ENV = {
    "dbt_host": dbt_host,
    "dbt_port": dbt_port,
    "dbt_dbname": dbt_dbname,
    "dbt_user": dbt_user,
    "dbt_password": dbt_password,
    "dbt_schema": dbt_schema,
}

Something like this?

 dbt_run = DbtRunOperator(
         task_id="dbt_run",
         env=DBT_ENV,
     )

BTW, do we need to define dbt_bin in MWAA?

dbt_bin="/usr/local/airflow/.local/bin/dbt",

Question: testing dbt through this package

Feel free to disregard if this is out of scope, but I'm curious how the developer or other users of this package are testing their mwaa/dbt projects so that they can deploy with confidence. My current process is basically:

  • develop/run dbt locally
  • create a dag for the project
  • push to CI
  • ci runs dbt from the cli with a config that should more closely match production
  • deploy dbt project to s3
  • mwaa picks up dag/code
  • user manually clicks through mwaa dashboard to trigger a run

The interface with mwaa, dbt and this package is where I'm having the hardest time testing code since errors only come up when I'm deep into the process. Is there a way that I could be running dbt locally through this package, or in CI (basically without getting all the way to mwaa) that would allow for faster development?

I've looked through the tests for this package and am thinking about going that route, but it seems to miss some of the airflow issues that could come up. I was also thinking of installing airflow and running the dags from the airflow cli in CI, but again not sure if I'm missing something.

Any advice, even just high level, would be much appreciated.

Setup a workflow to maintain a changelog

It's tiresome to write release notes, I think we could do this incrementally as we work on airflow-dbt-python and then copy the changelog to every release.

I'll dig around to see what tooling is available to set something up.

Can't remove directory of log after dbt run

Hi, we try to upgrade airflow-dbt-python 1.0.2 from version 0.14.5

But after we upgraded we got the error can't remove log directory

[2023-03-27, 04:47:29 UTC] {functions.py:222} INFO - �[0m04:47:29  Done. PASS=16 WARN=0 ERROR=0 SKIP=0 TOTAL=16
[2023-03-27, 04:47:32 UTC] {dbt.py:176} ERROR - An error has ocurred while executing dbt run
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/operators/dbt.py", line 170, in execute
    result = self.dbt_hook.run_dbt_task(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/dbt.py", line 207, in run_dbt_task
    with self.dbt_directory(
  File "/usr/local/lib/python3.10/contextlib.py", line 142, in __exit__
    next(self.gen)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_dbt_python/hooks/dbt.py", line 296, in dbt_directory
    with TemporaryDirectory(prefix="airflow_tmp") as tmp_dir:
  File "/usr/local/lib/python3.10/tempfile.py", line 869, in __exit__
    self.cleanup()
  File "/usr/local/lib/python3.10/tempfile.py", line 873, in cleanup
    self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
  File "/usr/local/lib/python3.10/tempfile.py", line 855, in _rmtree
    _shutil.rmtree(name, onerror=onerror)
  File "/usr/local/lib/python3.10/shutil.py", line 724, in rmtree
    _rmtree_safe_fd(fd, path, onerror)
  File "/usr/local/lib/python3.10/shutil.py", line 663, in _rmtree_safe_fd
    onerror(os.rmdir, fullname, sys.exc_info())
  File "/usr/local/lib/python3.10/shutil.py", line 661, in _rmtree_safe_fd
    os.rmdir(entry.name, dir_fd=topfd)
OSError: [Errno 39] Directory not empty: 'logs'

Our code.

dbt_run = DbtRunOperator(
        task_id="dbt_run",
        project_dir=s3://{S3_BUCKET}/dbt/dbt-project.zip,
        profiles_dir=s3://{S3_BUCKET}/dbt/profiles.yml,
        project_conn_id=S3_CONN,
        profiles_conn_id=S3_CONN,
        fail_fast=True,
    )

We run on airflow version 2.5.0 with python 3.10.8

airflow error:AttributeError: module 'airflow.utils.log' has no attribute 'file_processor_handler'

Specs:
Docker image: apache/airflow:2.2.2-python3.7
I'm spinning the environment up using the docker-compose and specifying the package as such

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- airflow-dbt-python==0.15.2 dbt-redshift==1.3.0}

The error is

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-ads 14.0.0 requires PyYAML<6.0,>=5.1, but you have pyyaml 6.0 which is incompatible.
apache-airflow 2.2.2 requires markupsafe<2.0,>=1.1.1, but you have markupsafe 2.1.2 which is incompatible.
Successfully installed Jinja2-3.1.2 MarkupSafe-2.1.2 agate-1.6.3 airflow-dbt-python-0.15.2 dbt-core-1.3.2 dbt-extractor-0.4.1 dbt-postgres-1.3.2 dbt-redshift-1.3.0 future-0.18.3 hologram-0.0.15 leather-0.3.4 logbook-1.5.3 mashumaro-3.0.4 minimal-snowplow-tracker-0.0.2 networkx-2.6.3 parsedatetime-2.4 pathspec-0.9.0 pytimeparse-1.1.8 pyyaml-6.0 sqlparse-0.4.3
WARNING: You are using pip version 21.3.1; however, version 22.3.1 is available.
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.
Unable to load the config, contains a configuration error.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/logging/config.py", line 388, in resolve
    found = getattr(found, frag)
AttributeError: module 'airflow.utils.log' has no attribute 'file_processor_handler'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/logging/config.py", line 390, in resolve
    self.importer(used)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/file_processor_handler.py", line 25, in <module>
    from airflow.utils.helpers import parse_template_string
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/helpers.py", line 27, in <module>
    from flask import url_for
  File "/home/airflow/.local/lib/python3.7/site-packages/flask/__init__.py", line 14, in <module>
    from jinja2 import escape
ImportError: cannot import name 'escape' from 'jinja2' (/home/airflow/.local/lib/python3.7/site-packages/jinja2/__init__.py)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/logging/config.py", line 563, in configure
    handler = self.configure_handler(handlers[name])
  File "/usr/local/lib/python3.7/logging/config.py", line 713, in configure_handler
    klass = self.resolve(cname)
  File "/usr/local/lib/python3.7/logging/config.py", line 397, in resolve
    raise v
  File "/usr/local/lib/python3.7/logging/config.py", line 390, in resolve
    self.importer(used)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/file_processor_handler.py", line 25, in <module>
    from airflow.utils.helpers import parse_template_string
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/helpers.py", line 27, in <module>
    from flask import url_for
  File "/home/airflow/.local/lib/python3.7/site-packages/flask/__init__.py", line 14, in <module>
    from jinja2 import escape
ValueError: Cannot resolve 'airflow.utils.log.file_processor_handler.FileProcessorHandler': cannot import name 'escape' from 'jinja2' (/home/airflow/.local/lib/python3.7/site-packages/jinja2/__init__.py)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__init__.py", line 46, in <module>
    settings.initialize()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/settings.py", line 483, in initialize
    LOGGING_CLASS_PATH = configure_logging()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/logging_config.py", line 74, in configure_logging
    raise e
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/logging_config.py", line 69, in configure_logging
    dictConfig(logging_config)
  File "/usr/local/lib/python3.7/logging/config.py", line 800, in dictConfig
    dictConfigClass(config).configure()
  File "/usr/local/lib/python3.7/logging/config.py", line 571, in configure
    '%r' % name) from e
ValueError: Unable to configure handler 'processor'

But I'm receiving the above error. Under some investigation, the most useful threads seemed to be this one, which is related to some naming conflict.

I pinned the version as such
psutil==4.4.2

But no success still. Also tried pinning PyYaml to be under 6, but also without success

Any thoughts ?

Airflow MWAA not able to install Spark Adapter

Hi @tomasfarias and thanks for the awesome lib!

I am having some trouble using airflow-dbt-python in a project which uses a spark adapter to connect to an AWS EMR instance. Its mostly related to the inability to install the dbt-spark[PyHive] in MWAA (Airflow version 2.0.2). airflow-dbt-python was installed without version specification in requirements.

I tried installing it through the airflow-dbt-python[snowflake,postgres,redshift,bigquery, spark] semantic at the requirements file, and also
airflow-dbt-python[all] which actually threw an error saying there was no "'all' option for extras". Aside from that, it works as intended with other backend adapters. Is there any way to use it with Spark adapter?

Thanks for the attention!

Improve S3 downloads by allowing zip files or pre-compiled models

The possibility of pulling an entire dbt project from S3 has proven to be very valuable in an Airflow context with high task-concurrency with a remote executor, as we cannot rely on tasks to run on the same machine as their dependencies. Unfortunately, pulling all files one by one from S3 is very slow. There are a couple of solutions I'd like to explore (well, three actually, but the third one is a combination of the previous two):

  • Allow downloading an entire dbt project as a zip file (or other compressed formats). The DbtS3Hook should understand file extensions and produce the appropriate de-compression.
  • Allow downloading pre-compiled queries instead of the entire project (basically everything under your-project/target/compiled/). Of course, we would also need to figure out a way to have dbt run these pre-compiled files and not try to compile them again. This would save quite a bit in dbt modules and macro definitions.
  • Combine the two solutions: compressed, pre-compiled projects.

The second and third are harder to implement as I'm not sure how we could override the standard dbt behavior, but it'll give it a fair shot. Worst case will ship only the first one which should still be beneficial.

dbt_deps question

@tomasfarias I'm playing around with trying to use the dbt_deps operator to install packages during mwaa run. So far I've been able to get the deps operator to run correctly, but the downstream task, dbt_run will fail due to not being able to find the packages that were just installed? Do I need to hand off the dbt_packages folder via xcom somehow?

I'm happy to add some documentation for the next person, I'm just hoping to get pointed in the right direction.

Thanks again for making this library!

Duplicate log lines

The dbt loggers are producing multiple lines of output that are essentially the same, but one of them is the debug output.

This is caused by both the file logger and the stdout logger being fired off. We tried to clear the
handlers from one of them in operators.dbt.BaseDbtOperator.override_dbt_logging but this is not
working.

We should dig into this issue further to avoid verbose logging outputs.

MWAA and Environment Variables in profiles.yml

Hello! I found your package tonight and am currently trying to get this set up. After finding workarounds for the below by essentially using the /tmp directory on MWAA, I believe I am on my last issue before successful deploy.

modules-path
target_path
log-path

I am trying to get secrets read in through my profiles.yml file. I have tried entries like the below but to no avail.

host: "{{ env_var('AIRFLOW__DB_ENDPOINT') }}"
host: "{{ env_var('AIRFLOW_DB_ENDPOINT') }}"
host: "{{ env_var('DB_ENDPOINT') }}"
host: "{{ env_var('AIRFLOW__DB_ENDPOINT', 'not_set') }}"

The last option let me bypass the error temporarily before it failed when building the model. I know this may not be entirely specific to your package, but noticed that you have helped people out before with issues trying to get MWAA running.

https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html - I was using this somewhat but have not yet actually added anything under custom options. As you can see above, I tried adding AIRFLOW_ and AIRFLOW__ to the beginning but no luck.

Thought I would reach out and see if you have had luck with something similar. We just switched over to using AWS's secret manager in MWAA and its been working like a charm in our Python tools and it still uses Variable.get() (from Airflows library).

The typical error I get is:

Env var required but not provided: 'DB_ENDPOINT'

but have also gotten this:

Env var required but not provided: 'DB_ENDPOINT'. This can happen when calling a macro that does not exist. Check for typos and/or install package dependencies with "dbt deps".

Any help would be appreciated if you have the time!

Improve and provide more DAG examples

airflow-dbt-python has grown quite a bit and is racking up some production experience. It would be great if we can expand the examples to cover more intricate DAGs to help provide ideas to folks setting out to work with the operator. In particular, the examples should showcase some of the new features like: pulling project files from S3 or pushing dbt artifacts to XCom.

The README currently only has one example. I believe we can leave that as it is (perhaps review it just in case it's not accurate with the current version of airflow-dbt-python) and add a new examples/ directory for new example DAGs.

connection as a target does not work in MWAA

Running the example code producing the following on AWS MWAA :

session = settings.Session()  # type: ignore
existing = session.query(Connection).filter_by(conn_id="my_db_connection").first()

if existing is None:
    # For illustration purposes, and to keep the example self-contained, we create
    # a Connection using Airflow's ORM. However, any method of loading connections would
    # work, like Airflow's UI, Airflow's CLI, or in deployment scripts.

    my_conn = Connection(
        conn_id="my_db_connection",
        conn_type="redshift",
        description="redshift connection",
        host="abc.ch0n7gct0zxb.us-west-2.redshift.amazonaws.com",
        login="dbt_process_user",
        port=5439,
        schema="stage",
        password= get_secret("dev/dbt_process_user")['password'],  # pragma: allowlist secret
        # Other dbt parameters can be added as extras
        extra=json.dumps(dict(threads=4, sslmode="require")),
    )
    session.add(my_conn)
    session.commit()
with DAG(
    dag_id="dbt_tomasfarias", catchup=False, default_args=default_args, tags=["dbt", "loan tape"], schedule_interval="0 11 * * *"
) as dag:
    dbt_run = DbtRunOperator(
        task_id="dbt_run",
        target="my_db_connection",
        #dbt_bin="/usr/local/airflow/.local/bin/dbt",
        profiles_dir=None,
        project_dir="/usr/local/airflow/dags/dbt/etl/",
  
    )
[2022-11-02, 16:11:29 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 140, in execute
    config = self.get_dbt_config()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 185, in get_dbt_config
    return factory.create_config(**config_kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 374, in create_config
    initialize_config_values(config)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/main.py", line 170, in initialize_config_values
    cfg = read_user_config(parsed.profiles_dir)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 74, in read_user_config
    profile = read_profile(directory)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 50, in read_profile
    path = os.path.join(profiles_dir, 'profiles.yml')
  File "/usr/lib64/python3.7/posixpath.py", line 80, in join
    a = os.fspath(a)
TypeError: expected str, bytes or os.PathLike object, not NoneType

Implement Apache Airflow provider's hook interface

Not quite an interface as it's not defined in BaseHook, but most Apache Airflow providers define proper connection types for each of their hooks. This may be a chance to refactor the hooks module into individual hook classes for each dbt supported target type. Furthermore, this can be the chance to clean up the docstring documentation as it's missing a lot of information.

Perhaps we can apply to be integrated into Apache Airflow as an official provider? Currently, a dbt Cloud provider package exists, but nothing for "vanilla" dbt.

dbt seed operator only runs successfully on first run after reboot of mwaa

I'm using this very helpful package to run dbt code on mwaa and have encountered a weird bug where the dbt seed operator only works on the first run after the mwaa environment has been created or updated. Subsequent runs fail with a .csv not found error. Notably, dbt seed appears to be looking in the wrong temp directory on the second run.

An example of the failure: �[33m[Errno 2] No such file or directory: '/tmp/airflowtmpeijb2yn7/data/METRICS_SEED.csv'
The temp directory, in this case airflowtmpeijb2yn7, does not match the directory that the "fetch from s3" step has downloaded all the dbt files into. Looking at that log, I can see that the .csv file I wanted was downloaded, along with all the other dbt files into a different subdirectory of tmp.

  • All dbt related files live in s3 buckets
  • dbt deps is run in ci so that it isn't called every time in airflow

I'm not sure if this is even an issue with this package or a bug with mwaa or dbt, but I thought I'd raise it here first.

[Feature] Support HTTP authentication for DbtGitRemote

I am trying to use GitLab DBT project repo using DbtGitRemoteHook

 dbt_run = DbtRunOperator(
        task_id="dbt_run",
        project_dir="https://domain/abc/-/tree/main/dbt/db_metrics?private_token=abcdesf",
        dbt_conn_id="dbt_conn_id",
        target="dev",
        do_xcom_push_artifacts=["run_results.json"],
    )
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='scm.platform.us-west-2.io', port=443): Max retries exceeded with url: https://scm.platform.us-west-2.io/users/auth/saml (Caused by ResponseError('too many redirects'))

When i tried to access on the browser with https and the token i was able to access

Also tried with below project_dir

project_dir="https://$gitlabUser:$gitlabToken@domain/abc.git",

File "/home/airflow/.local/lib/python3.10/site-packages/dulwich/client.py", line 705, in clone
  result = self.fetch(path, target, progress=progress, depth=depth)
File "/home/airflow/.local/lib/python3.10/site-packages/dulwich/client.py", line 782, in fetch
  result = self.fetch_pack(
File "/home/airflow/.local/lib/python3.10/site-packages/dulwich/client.py", line 2085, in fetch_pack
  refs, server_capabilities, url = self._discover_references(
File "/home/airflow/.local/lib/python3.10/site-packages/dulwich/client.py", line 1941, in _discover_references
  resp, read = self._http_request(url, headers)
File "/home/airflow/.local/lib/python3.10/site-packages/dulwich/client.py", line 2219, in _http_request
  raise HTTPUnauthorized(resp.headers.get("WWW-Authenticate"), url)
dulwich.client.HTTPUnauthorized: No valid credentials provided

dbt docs create not available

Love this repo.. thanks for all your hard work. Is there a technical reason why the docs command has been omitted?

Loading dbt from S3 zip file doesn't work

Hello,
testing version airflow-dbt-python==0.11.0 on MWAA 2.0.2, it works flawlessly with:

dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/s3-prefix/dbt/", profiles_dir="s3://s3-bucket/s3-prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

but if I try using a dbt.zip file:

dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/prefix/dbt/dbt.zip", profiles_dir="s3://s3-bucket/prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

where dbt.zip file structure is:

d----- 24/01/2022 19:05 analyses
d----- 24/01/2022 19:05 data
d----- 27/01/2022 14:59 logs
d----- 24/01/2022 19:05 macros
d----- 01/02/2022 18:42 models
d----- 27/01/2022 14:48 packages
d----- 24/01/2022 19:05 snapshots
d----- 27/01/2022 15:01 target
d----- 01/02/2022 18:54 tests
-a---- 24/01/2022 19:05 29 .gitignore
-a---- 27/01/2022 15:19 1339 dbt_project.yml
-a---- 27/01/2022 14:50 88 packages.yml
-a---- 24/01/2022 19:05 571 README.md

I can see these logs:

[2022-02-07 09:57:04,793] {{dbt.py:263}} INFO - Fetching profiles.yml from S3: s3://s3-bucket/s3-prefix/dbt_profile
[2022-02-07 09:57:04,805] {{s3.py:35}} INFO - Downloading dbt profiles file from: s3://s3-bucket/s3-prefix/dbt_profile
[2022-02-07 09:57:04,806] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None
[2022-02-07 09:57:04,849] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,849] {{base_aws.py:157}} INFO - role_arn is None
[2022-02-07 09:57:05,063] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt_profile/profiles.yml') file to: /tmp/airflowtmpmtcgxrxd/profiles.yml
[2022-02-07 09:57:05,093] {{dbt.py:271}} INFO - Fetching dbt project from S3: s3://s3-bucket/s3-prefix/dbt/dbt.zip
[2022-02-07 09:57:05,094] {{s3.py:85}} INFO - Downloading dbt project files from: s3://s3-bucket/s3-prefix/dbt/dbt.zip
[2022-02-07 09:57:05,094] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None
[2022-02-07 09:57:05,137] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,137] {{base_aws.py:157}} INFO - role_arn is None
[2022-02-07 09:57:05,222] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt/dbt.zip') file to: /tmp/airflowtmpmtcgxrxd/dbt_project.zip
[2022-02-07 09:57:05,411] {{dbt.py:152}} INFO - Running dbt configuration: TestTaskConfig(cls=<class 'dbt.task.test.TestTask'>, project_dir='/tmp/airflowtmpmtcgxrxd/', profiles_dir='/tmp/airflowtmpmtcgxrxd/', profile='dev_profile', target='dev', compiled_target=None, fail_fast=None, single_threaded=None, threads=None, use_experimental_parser=None, vars='{}', warn_error=None, log_format=None, log_cache_events=False, record_timing_info=None, debug=None, defer=None, partial_parse=None, use_colors=None, static_parser=None, version_check=None, send_anonymous_usage_stats=None, write_json=None, exclude=None, select=None, selector_name=None, state=None, models=None, generic=None, indirect_selection=None, singular=None, store_failures=None, which='test')
[2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05.632049 [info ] [MainThread]: Partial parse save file not found. Starting full parse.
[2022-02-07 09:57:05,632] {{log.py:235}} WARNING - 09:57:05 Partial parse save file not found. Starting full parse.
[2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05 Partial parse save file not found. Starting full parse.
[2022-02-07 09:57:06,226] {{functions.py:248}} INFO - 09:57:06.226743 [info ] [MainThread]: Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
[2022-02-07 09:57:06,227] {{log.py:235}} WARNING - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
[2022-02-07 09:57:06,227] {{functions.py:248}} INFO - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
[2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06.228139 [info ] [MainThread]:
[2022-02-07 09:57:06,228] {{log.py:235}} WARNING - 09:57:06
[2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06
[2022-02-07 09:57:06,228] {{functions.py:250}} WARNING - 09:57:06.228662 [warn ] [MainThread]: [�[33mWARNING�[0m]: Nothing to do. Try checking your model configs and model specification args
[2022-02-07 09:57:06,229] {{log.py:235}} WARNING - 09:57:06 [
[2022-02-07 09:57:06,229] {{log.py:235}} WARNING - WARNING
[2022-02-07 09:57:06,229] {{log.py:235}} WARNING - ]: Nothing to do. Try checking your model configs and model specification args
[2022-02-07 09:57:06,229] {{functions.py:250}} WARNING - 09:57:06 [�[33mWARNING�[0m]: Nothing to do. Try checking your model configs and model specification args
[2022-02-07 09:57:06,282] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=dbt_python_tests, task_id=dbt_run_daily, execution_date=20220207T095658, start_date=20220207T095704, end_date=20220207T095706
[2022-02-07 09:57:06,310] {{taskinstance.py:1246}} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2022-02-07 09:57:09,764] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:09,764] {{local_task_job.py:188}} WARNING - State of this instance has been externally set to success. Terminating instance.

But tests (there are 6) and macros (more than 500) are not being retrieved. I tried to perform several tests but got no luck. Did I do something wrong?

Support for Python 3.10 to CI/CD checks and deprecate 3.7

The new Python version has been out for a while. We should support it without issues, but we won't know for real until we add the new version to CI/CD.

Also, we are deprecating Python 3.7. I think that supporting the last 3 versions should be enough.

[Bug/Feature] DBT does nothing on inexistent model

Context


By defalt dbt run --select "a_model_that_does_not_exist" will do nothing and only print a Warning:

WARNING: Nothing to do. Try checking your model configs and model specification args

Example


If one creates a dag with a single DbtRunOperator selecting "a_model_that_does_not_exist" it runs successfully, which I don't think should be the right behaviour:

from airflow_dbt_python.operators.dbt import DbtRunOperator
from airflow import DAG
​
with DAG(...) as dag:
    operator = DbtRunOperator(
        models=["this_does_not_exist"],
        ...
    )


I just get a "silently failing" successful task in airflow when run.

dbt logs all threads instead of just main thread

There's a small bug when running with dbt>=1.0: When running multiple threads, airflow-dbt-python captures the main thread as well as each individual thread's log, so we see log lines duplicated (once for main thread once for each individual thread). This is a bit spammy and makes things hard to read, let's ensure we are only logging the main thread.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.