The "download_result_" tasks are failing when a DAG run is created based on the DAG defined in https://github.com/GoogleCloudPlatform/ci-cd-for-data-processing-workflow/blob/master/source-code/workflow-dag/data-pipeline-test.py. The Dataflow pipeline runs successfully, but the sharding behavior has changed since this DAG was first written. All of the output is written to a single shard instead of multiple shards and the download_result_ tasks fail with a "File not found error". The logs from the error message for the download_result_1 task are included below.
*** Reading remote log from gs://us-central1-data-pipeline-c-ae7165ed-bucket/logs/test_word_count/download_result_1/2021-10-18T21:16:11+00:00/1.log.
[2021-10-18 21:22:08,103] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: test_word_count.download_result_1 2021-10-18T21:16:11+00:00 [queued]>
[2021-10-18 21:22:08,176] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: test_word_count.download_result_1 2021-10-18T21:16:11+00:00 [queued]>
[2021-10-18 21:22:08,177] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-10-18 21:22:08,178] {taskinstance.py:882} INFO - Starting attempt 1 of 1
[2021-10-18 21:22:08,178] {taskinstance.py:883} INFO -
--------------------------------------------------------------------------------
[2021-10-18 21:22:08,226] {taskinstance.py:902} INFO - Executing <Task(GoogleCloudStorageDownloadOperator): download_result_1> on 2021-10-18T21:16:11+00:00
[2021-10-18 21:22:08,230] {standard_task_runner.py:54} INFO - Started process 817 to run task
[2021-10-18 21:22:08,317] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'test_word_count', 'download_result_1', '2021-10-18T21:16:11+00:00', '--job_id', '10', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/data-pipeline-test.py', '--cfg_path', '/tmp/tmpoA3Yca']
[2021-10-18 21:22:08,318] {standard_task_runner.py:78} INFO - Job 10: Subtask download_result_1
[2021-10-18 21:22:09,010] {logging_mixin.py:120} INFO - Running <TaskInstance: test_word_count.download_result_1 2021-10-18T21:16:11+00:00 [running]> on host airflow-worker-86677b8bb6-dnz5q
[2021-10-18 21:22:09,351] {gcs_download_operator.py:86} INFO - Executing download: qwiklabs-gcp-03-cd2d00dd104f-composer-result-test, output-00000-of-00003, None
[2021-10-18 21:22:09,401] {gcp_api_base_hook.py:145} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2021-10-18 21:22:09,617] {taskinstance.py:1152} ERROR - 404 GET https://storage.googleapis.com/download/storage/v1/b/qwiklabs-gcp-03-cd2d00dd104f-composer-result-test/o/output-00000-of-00003?alt=media: No such object: qwiklabs-gcp-03-cd2d00dd104f-composer-result-test/output-00000-of-00003: (u'Request failed with status code', 404, u'Expected one of', 200, 206)
Traceback (most recent call last):
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/airflow/airflow/contrib/operators/gcs_download_operator.py", line 94, in execute
object=self.object)
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcs_hook.py", line 179, in download
return blob.download_as_string()
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 1391, in download_as_string
timeout=timeout,
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 1302, in download_as_bytes
checksum=checksum,
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/client.py", line 731, in download_blob_to_file
_raise_from_invalid_response(exc)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 3936, in _raise_from_invalid_response
raise exceptions.from_http_status(response.status_code, message, response=response)
NotFound: 404 GET https://storage.googleapis.com/download/storage/v1/b/qwiklabs-gcp-03-cd2d00dd104f-composer-result-test/o/output-00000-of-00003?alt=media: No such object: qwiklabs-gcp-03-cd2d00dd104f-composer-result-test/output-00000-of-00003: (u'Request failed with status code', 404, u'Expected one of', 200, 206)
[2021-10-18 21:22:09,668] {taskinstance.py:1196} INFO - Marking task as FAILED. dag_id=test_word_count, task_id=download_result_1, execution_date=20211018T211611, start_date=20211018T212208, end_date=20211018T212209
[2021-10-18 21:22:13,158] {local_task_job.py:102} INFO - Task exited with return code 1