Coder Social home page Coder Social logo

mozilla / telemetry-airflow Goto Github PK

View Code? Open in Web Editor NEW
169.0 33.0 86.0 2.94 MB

Airflow configuration for Telemetry

Home Page: https://workflow.telemetry.mozilla.org/

License: Mozilla Public License 2.0

Python 97.95% Shell 1.46% Makefile 0.39% Dockerfile 0.20%
airflow mozilla-telemetry

telemetry-airflow's Introduction

Telemetry-Airflow

CircleCI Python 3.11.8 License: MPL 2.0 Code style: black

Apache Airflow is a platform to programmatically author, schedule and monitor workflows.

This repository codifies the Airflow cluster that is deployed at workflow.telemetry.mozilla.org (behind SSO) and commonly referred to as "WTMO" or simply "Airflow".

Some links relevant to users and developers of WTMO:

  • The dags directory in this repository contains some custom DAG definitions
  • Many of the DAGs registered with WTMO don't live in this repository, but are instead generated from ETL task definitions in bigquery-etl
  • The Data SRE team maintains a WTMO Developer Guide (behind SSO)

Writing DAGs

See the Airflow's Best Practices guide to help you write DAGs.

⚠ Warning: Do not import resources from the dags directory in DAGs definition files ⚠

As an example, if you have dags/dag_a.py and dags/dag_b.py and want to use a helper function in both DAG definition files, define the helper function in the utils directory such as:

utils/helper.py

def helper_function():
    return "Help"

dags/dag_a.py

from airflow import DAG

from utils.helper import helper_function

with DAG("dag_a", ...):
    ...

dags/dag_b.py

from airflow import DAG

from utils.helper import helper_function

with DAG("dag_b", ...):
    ...

WTMO deployments use git-sync sidecars to synchronize DAG files from multiple repositories via telemetry-airflow-dags using git submodules. Git-sync sidecar pattern results in the following directory structure once deployed.

airflow
├─ dags
│  └── repo
│      └── telemetry-airflow-dags
│          ├── <submodule repo_a>
│          │    └── dags
│          │        └── <dag files>
│          ├── <submodule repo_b>
│          │    └── dags
│          │        └── <dag files>
│          └── <submodule repo_c>
│               └── dags
│                   └── <dag files>
├─ utils
│  └── ...
└─ plugins
   └── ...

Hence, defining helper_function() in dags/dag_a.py and importing the function in dags/dag_b.py as from dags.dag_a import helper_function will not work after deployment because of the directory structured required for git-sync sidecars.

Prerequisites

This app is built and deployed with docker and docker-compose. Dependencies are managed with pip-tools pip-compile.

You'll also need to install PostgreSQL to build the database container.

Installing dependencies locally

⚠ Make sure you use the right Python version. Refer to Dockerfile for current supported Python Version ⚠

You can install the project dependencies locally to run tests with Pytest. We use the official Airflow constraints file to simplify Airflow dependency management. Install dependencies locally using the following command:

make pip-install-local

Updating Python dependencies

Add new Python dependencies into requirements.in or requirements-dev.in then execute the following commands:

make pip-compile
make pip-install-local

Build Container

Build Airflow image with

make build

Local Deployment

To deploy the Airflow container on the docker engine, with its required dependencies, run:

make build
make up

macOS

Assuming you're using Docker for Docker Desktop for macOS, start the docker service, click the docker icon in the menu bar, click on preferences and change the available memory to 4GB.

Testing

Adding dummy credentials

Tasks often require credentials to access external credentials. For example, one may choose to store API keys in an Airflow connection or variable. These variables are sure to exist in production but are often not mirrored locally for logistical reasons. Providing a dummy variable is the preferred way to keep the local development environment up to date.

Update the resources/dev_variables.env and resources/dev_connections.env with appropriate strings to prevent broken workflows.

Usage

You can now connect to your local Airflow web console at http://localhost:8080/.

All DAGs are paused by default for local instances and our staging instance of Airflow. In order to submit a DAG via the UI, you'll need to toggle the DAG from "Off" to "On". You'll likely want to toggle the DAG back to "Off" as soon as your desired task starts running.

Testing GKE Jobs (including BigQuery-etl changes)

See https://go.corp.mozilla.com/wtmodev for more details.

make build && make up
make gke

When done:
make clean-gke

From there, connect to Airflow and enable your job.

Testing Dataproc Jobs

Dataproc jobs run on a self-contained Dataproc cluster, created by Airflow.

To test these, jobs, you'll need a sandbox account and corresponding service account. For information on creating that, see "Testing GKE Jobs". Your service account will need Dataproc and GCS permissions (and BigQuery, if you're connecting to it). Note: Dataproc requires "Dataproc/Dataproc Worker" as well as Compute Admin permissions. You'll need to ensure that the Dataproc API is enabled in your sandbox project.

Ensure that your dataproc job has a configurable project to write to. Set the project in the DAG entry to be configured based on development environment; see the ltv.py job for an example of that.

From there, run the following:

make build && make up
./bin/add_gcp_creds $GOOGLE_APPLICATION_CREDENTIALS google_cloud_airflow_dataproc

You can then connect to Airflow locally. Enable your DAG and see that it runs correctly.

Production Setup

This repository was structured to be deployed using the offical Airflow Helm Chart.. See the Production Guide for best practices.

Debugging

Some useful docker tricks for development and debugging:

make clean

# Remove any leftover docker volumes:
docker volume rm $(docker volume ls -qf dangling=true)

# Purge docker volumes (helps with postgres container failing to start)
# Careful as this will purge all local volumes not used by at least one container.
docker volume prune

telemetry-airflow's People

Contributors

acmiyaguchi avatar akkomar avatar alekhyamoz avatar anich avatar benwu avatar chelseatroy avatar crankycoder avatar dexterp37 avatar edugfilho avatar eu9ene avatar fbertsch avatar hackebrot avatar haroldwoo avatar harterrt avatar jasonthomas avatar jklukas avatar kik-kik avatar lelilia avatar mikaeld avatar mreid-moz avatar quiiver avatar relud avatar robhudson avatar robotblake avatar scholtzan avatar sean-rose avatar sunahsuh avatar vitillo avatar whd avatar wlach avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

telemetry-airflow's Issues

Task 'event_events' in the dag 'copy_deduplicate' failing due to table/view not found exception (telemetry.event))

[2020-10-23 01:13:41,522] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,521] {pod_launcher.py:125} INFO - b'gcp:telemetry.event was not found in location US\n'
[2020-10-23 01:13:41,680] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,680] {pod_launcher.py:125} INFO - b'Traceback (most recent call last):\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' return _run_code(code, main_globals, None,\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' exec(code, run_globals)\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' File "/app/bigquery_etl/run_query.py", line 107, in \n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' main()\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' File "/app/bigquery_etl/run_query.py", line 97, in main\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' run(\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' File "/app/bigquery_etl/run_query.py", line 91, in run\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' subprocess.check_call(["bq"] + query_arguments, stdin=query_stream)\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' File "/usr/local/lib/python3.8/subprocess.py", line 364, in check_call\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' raise CalledProcessError(retcode, cmd)\n'
[2020-10-23 01:13:41,683] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,683] {pod_launcher.py:125} INFO - b"subprocess.CalledProcessError: Command '['bq', 'query', '--project_id=dataset', '--parameter=submission_date:DATE:2020-10-22', '--schema_update_option=ALLOW_FIELD_ADDITION', '--dataset_id=telemetry_derived', '--destination_table=event_events_v1$20201022']' returned non-zero exit status 2.\n"
[2020-10-23 01:13:42,732] {logging_mixin.py:112} INFO - [2020-10-23 01:13:42,731] {pod_launcher.py:142} INFO - Event: event-events-66c5f787 had an event of type Failed
[2020-10-23 01:13:42,783] {taskinstance.py:1145} ERROR - Pod Launching failed: Pod returned a failure: failed
Traceback (most recent call last):
File "/app/dags/operators/backport/kubernetes_pod_operator_1_10_7.py", line 268, in execute
'Pod returned a failure: {state}'.format(state=final_state)
airflow.exceptions.AirflowException: Pod returned a failure: failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/app/dags/operators/gcp_container_operator.py", line 98, in execute
result = super(UpstreamGKEPodOperator, self).execute(context) # Moz specific
File "/app/dags/operators/backport/kubernetes_pod_operator_1_10_7.py", line 273, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod returned a failure: failed
[2020-10-23 01:13:42,785] {taskinstance.py:1187} INFO - All retries failed; marking task as FAILED.dag_id=copy_deduplicate, task_id=event_events, execution_date=20201022T010000, start_date=20201023T011333, end_date=20201023T011342

Create Docker Image directory which deploys to Dockerhub

The PodOperator is great because it allows us to use specific environments for different jobs. The idea here is for us to store Airflow Job Dockerfiles in telemetry-airflow. These will be built alongside the CI. We can optionally build them in CI nightly, or as an Airflow step, if we need that.

These images will live in /images, which does not need to be included in the telemetry-airflow:latest docker image.

Each image will have it's own directory. The image will be pushed to the telemetry-airflow repository in Dockerhub, but with the tag $DIRNAME. So for example:

/images
  /mozilla-schema-generator
    - Dockerfile
    - docker-compose.yml
    - requirements.txt

That image would be built and pushed to telemetry-airflow:mozilla-schema-generator. The PodOperator would use that tag to run the associated task.

cc @whd, @haroldwoo for feedback

main_summary.experiments_aggregates_import may overwrite production data in dev

email=["[email protected]", "[email protected]"],
env={"date": "{{ ds_nodash }}", "bucket": "{{ task.__class__.private_output_bucket }}"},
uri="https://raw.githubusercontent.com/mozilla/firefox-test-tube/master/notebook/import.py",

@robhudson is this job safe to run with a different bucket as a source i.e. telemetry-test-bucket on a local airflow instance? Or will it fail because the dev airflow doesn't have the right keys?

main_summary.clients_daily does not expose input-bucket

env=mozetl_envvar("clients_daily", {
# Note that the output of this job will be earlier
# than this date to account for submission latency.
# See the clients_daily code in the python_mozetl
# repo for more details.
"date": "{{ ds }}",
"output-bucket": "{{ task.__class__.private_output_bucket }}"
}),

See the option in mozetl:
https://github.com/mozilla/python_mozetl/blob/master/mozetl/clientsdaily/rollup.py#L148

This will run the full job if the dag is run from a local dev instance.

Task `crash_report_parquet` job is failing in socorro_import failing

The create_dataproc_cluster task is failing at some point after dataproc_init.sh is called. You can see the requirements getting installed in the logs and then nothing after that:

https://workflow.telemetry.mozilla.org/log?task_id=crash_report_parquet&dag_id=socorro_import&execution_date=2020-12-06T00%3A00%3A00%2B00%3A00

DAG ID                               Task ID                  Execution date               Try number
-----------------------------------  -----------------------  -------------------------  ------------
socorro_import.crash_report_parquet  create_dataproc_cluster  2020-12-06 00:00:00+00:00             4
socorro_import.crash_report_parquet  run_dataproc_pyspark     2020-12-06 00:00:00+00:00             1
...
[2020-12-07 02:20:58,017] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=socorro_import, task_id=crash_report_parquet, execution_date=20201206T000000, start_date=20201207T022052, end_date=20201207T022058
$ gsutil cat gs://moz-fx-data-prod-dataproc-scratch/google-cloud-dataproc-metainfo/21a4a832-667d-4b80-85c2-72d4076d6736/socorro-import-dataproc-cluster-m/dataproc-initialization-script-0_output
...
Requirement already satisfied: python-dateutil in /opt/conda/anaconda/lib/python3.6/site-packages (from arrow==0.10.0->-r /tmp/requirements.txt (line 1)) (2.8.1)
Requirement already satisfied: pytz>=2011k in /opt/conda/anaconda/lib/python3.6/site-packages (from pandas==0.23.4->-r /tmp/requirements.txt (line 8)) (2020.1)
Requirement already satisfied: setuptools in /opt/conda/anaconda/lib/python3.6/site-packages (from protobuf==3.6.1->-r /tmp/requirements.txt (line 15)) (50.3.0.post20201103)
Requirement already satisfied: py4j==0.10.7 in /opt/conda/anaconda/lib/python3.6/site-packages (from -r /tmp/requirements.txt (line 16)) (0.10.7)
Requirement already satisfied: python-dateutil in /opt/conda/anaconda/lib/python3.6/site-packages (from arrow==0.10.0->-r /tmp/requirements.txt (line 1)) (2.8.1)
Requirement already satisfied: urllib3<1.26,>=1.25.4 in /opt/conda/anaconda/lib/python3.6/site-packages (from botocore->-r /tmp/requirements.txt (line 4)) (1.25.11)
Requirement already satisfied: click==6.7 in /opt/conda/anaconda/lib/python3.6/site-packages (from -r /tmp/requirements.txt (line 5)) (6.7)
Requirement already satisfied: python-dateutil in /opt/conda/anaconda/lib/python3.6/site-packages (from arrow==0.10.0->-r /tmp/requirements.txt (line 1)) (2.8.1)
Requirement already satisfied: pytz>=2011k in /opt/conda/anaconda/lib/python3.6/site-packages (from pandas==0.23.4->-r /tmp/requirements.txt (line 8)) (2020.1)
Requirement already satisfied: setuptools in /opt/conda/anaconda/lib/python3.6/site-packages (from protobuf==3.6.1->-r /tmp/requirements.txt (line 15)) (50.3.0.post20201103)
Requirement already satisfied: py4j==0.10.7 in /opt/conda/anaconda/lib/python3.6/site-packages (from -r /tmp/requirements.txt (line 16)) (0.10.7)

It's been failing for 4 days now.

S3FSCheckSuccess fails when __HIVE_DEFAULT_PARTITION__ exists

def check_s3fs_success(bucket, prefix, num_partitions):
"""Check the s3 filesystem for the existence of `_SUCCESS` files in dataset partitions.
:prefix: Bucket prefix of the table
:num_partitions: Number of expected partitions
"""
s3 = boto3.resource("s3")
objects = s3.Bucket(bucket).objects.filter(Prefix=prefix)
success = set([obj.key for obj in objects if "_SUCCESS" in obj.key])
return len(success) == num_partitions

This fails when there is a __HIVE_DEFAULT_PARTITION__ in the dataset.

$aws s3 ls s3://telemetry-parquet/main_summary/v4/submission_date_s3=20190105/ --recursive | grep _SUCCESS | wc -l

101

This should be modified so the number of partitions is the minimum required _SUCCESS partitions to be written, or an explicit list of partitions.

Fix or deprecate EMRSparkOperator

When running a task via EMRSparkOperator (for example core_client_count_view or devtools_prerelease_events_to_amplitude) you get a warning like this:

[2019-05-26 01:35:53,442] {logging_mixin.py:95} WARNING - /usr/local/lib/python2.7/site-packages/airflow/models.py:1674: DeprecationWarning: BaseOperator.post_execute() now takes two arguments, contextandresult, but "core_client_count_view" only expected one. This behavior is deprecated and will be removed in a future version of Airflow. category=DeprecationWarning)

Looks like we need to update the post_execute method to take an additional argument.

main_summary.client_count_daily_view should use bucket envvar for input

env={"date": "{{ ds_nodash }}", "bucket": "{{ task.__class__.private_output_bucket }}"},
uri="https://raw.githubusercontent.com/mozilla/telemetry-airflow/master/jobs/client_count_daily_view.sh",

--files "s3://telemetry-parquet/main_summary/v4/" \

The full job will be run when the full main_summary dag is enabled. This can be fixed by changing the --files parameter to point to the right bucket.

fix requirements installed by dataproc_init.sh

dataproc_init.sh runs pip 20.3.1 with the new resolver and the requirements that it's trying to install don't gel well with one another causing the resolver to take infinite time to figure out a set of packages that meet the requirements.

#1198 covers changing dataproc_init.sh to use pip<20.3.0 which puts us back on the old resolver.

This issue covers fixing our requirements so that they don't cause the infinite-time-to-resolve problem.

Add mozetl-runner for external mozetl-compatible modules

See mozilla/python_mozetl#316.

There primary pain point in maintaining a mono-repo for the python spark jobs is dependency management. There is a new mozetl-databricks script in mozetl that can run compatible repo's using a convention of passing arguments via the environment. As long as a module is consistent with existing conventions, it should be straightforward to run on the moz_databricks plugin.

This is the path of least resistance for scheduling https://github.com/wcbeard/bgbb_airflow, which includes some python3-only dependencies.

Specify exact container version for pipelines feeding public data

There's interest in developing some stronger safeguards around pipelines that feed publicly released data. Pipeline code generally does not live in this repository, but this repo does serve as a reasonable place for gatekeeping, since code generally has to be scheduled to run here.

To better ensure discipline that we are thinking through implications of making changes to published data, we should consider the following steps:

  • Isolate all pipelines that feed public data to a single DAG (public_data) that includes commentary about ensuring data review, etc.
  • Specify exact container versions to run within this DAG to lessen the chance of picking up code from another repository that has accidentally skipped data review
  • Protect changes to this DAG via a CODEOWNERS entry

cc @mreid-moz @scholtzan @fbertsch

Add license file

I've been looking for examples on how to use Airflow in practice and this repository is the best I've found. Thanks a lot!

I'm using this as the basis of a data pipeline for OpenTrials (https://github.com/opentrials/opentrials-airflow), as it's very well written and organised. As it's created by Mozilla and on GitHub, I'm assuming it's FLOSS. However, you don't specify a license, so I don't know under which terms. Could you add one?

Thanks again for the great work 👏

Create a "Wait for build" operator for dockerized components

Our general dockerized components arch looks like this:

  1. GH repo has the code and associated docker build components
  2. Circleci runs a build and deploy step on merge into master
  3. Artifacts are deployed to Dockerhub or Google Container Registry

For jobs that run dockerized components, we often merge a change then rerun the job. However the build usually isn't deployed at that point. We should have an operator that checks for a build step and, if it's happening, waits for it before running the job.

Airflow DAGs are poorly named and formatted

In a moment of clarity, I realized that the DAGs are defined in python files. We should be using better variable names (e.g. not t1, t2, t3) and should take a look at formatting these definitions better.

For example, in the main_summary.py DAG t1 should be renamed to main_summary. Then we can say things like: search_daily_rollups.set_upstream(main_summary) instead of t5.set_upstream(t1).

main_summary.clients_daily_v6 does not expose input-bucket

env=tbv_envvar("com.mozilla.telemetry.views.ClientsDailyView", {
"date": "{{ ds_nodash }}",
"output-bucket": "{{ task.__class__.private_output_bucket }}"
}),

See source:
https://github.com/mozilla/telemetry-batch-view/blob/ded89d329a9a70d945b0fc58906db5b51b105658/src/main/scala/com/mozilla/telemetry/views/ClientsDailyView.scala#L20-L24

The job will run from the telemetry-parquet instead of telemetry-test-bucket when run locally.

prio-processor dag fails to start gke cluster

Logs:

[2020-03-10 23:15:39,120] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: prio_processor.bootstrap.create_gke_cluster 2020-02-09T17:29:33.628418+00:00 [running]> airflow-prod-airflow-app-1-worker-0.airflow-prod-airflow-app-1-worker.prod-airflow.svc.cluster.local
[2020-03-10 23:15:39,152] {taskinstance.py:1088} ERROR - 'dict' object has no attribute 'name'
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 955, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/gcp_container_operator.py", line 177, in execute
    self._check_input()
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/gcp_container_operator.py", line 168, in _check_input
    elif self.body.name and self.body.initial_node_count:
AttributeError: 'dict' object has no attribute 'name'

The underlying airflow code has changed, causing failures in starting a cluster.

CODE_OF_CONDUCT.md file missing

As of January 1 2019, Mozilla requires that all GitHub projects include this CODE_OF_CONDUCT.md file in the project root. The file has two parts:

  1. Required Text - All text under the headings Community Participation Guidelines and How to Report, are required, and should not be altered.
  2. Optional Text - The Project Specific Etiquette heading provides a space to speak more specifically about ways people can work effectively and inclusively together. Some examples of those can be found on the Firefox Debugger project, and Common Voice. (The optional part is commented out in the raw template file, and will not be visible until you modify and uncomment that part.)

If you have any questions about this file, or Code of Conduct policies and procedures, please see Mozilla-GitHub-Standards or email [email protected].

(Message COC001)

Jobs run with GKEOperator need `get_logs=False`, otherwise job is likely to fail unless constantly logging to standard out

I noticed this while working on adding the missioncontrol-etl job (#840), but apparently this happened with the probe scraper as well.

tl;dr: a job must print something to standard out / error every 30 seconds or so, or else it will fail with a mysterious error saying IncompleteRead:

https://issues.apache.org/jira/browse/AIRFLOW-3534

I'm not sure if there's an easy / good workaround here. The function that's causing the problem is called read_namespaced_pod_log, which (AFAICT) is using a persistently opened http connection in Kubernetes to read the log under the hood:

https://github.com/apache/airflow/blob/c890d066965aa9dbf3016f41cfae45e9a084478a/airflow/kubernetes/pod_launcher.py#L173

I did some spelunking in the kubernetes python repository + issue tracker, and to be honest it doesn't seem like this type of use case is really taken into account with the API. There is no way to pick up the logs again in the event of a timeout or similiar, see for example this issue comment:

kubernetes-client/python#199 (comment)

The workaround is just to not get the logs and rely on stackdriver logging. This is pretty non-ideal: it increases the amount of filtering/spelunking you would need to do pretty significantly in the case that something goes wrong. Filing this issue for internal visibility, as it's a pretty serious gotcha.

glam_org_mozilla_fenix dag failing on gke_command

run_sql = gke_command(
task_id="run_sql",
cmds=["bash"],
env_vars={"DATASET": "glam_etl", "SUBMISSION_DATE": "{{ ds }}"},
command=["script/glam/run_fenix_sql"],
docker_image="mozilla/bigquery-etl:latest",
gcp_conn_id="google_cloud_glam_fenix_dev",
dag=dag,
)

[2020-02-28 22:47:03,770] {taskinstance.py:1088} ERROR - Command '['gcloud', 'container', 'clusters', 'get-credentials', u'bq-load-gke-1', '--zone', u'us-central1-a', '--project', u'fenix-glam-dev']' returned non-zero exit status 1
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 955, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/app/pvmount/telemetry-airflow/dags/operators/gcp_container_operator.py", line 89, in execute
    "--project", self.project_id])
  File "/usr/local/lib/python2.7/subprocess.py", line 190, in check_call
    raise CalledProcessError(retcode, cmd)
CalledProcessError: Command '['gcloud', 'container', 'clusters', 'get-credentials', u'bq-load-gke-1', '--zone', u'us-central1-a', '--project', u'fenix-glam-dev']' returned non-zero exit status 1

The gcp_conn_id may not have access to run in the cluster. However, I would like the container to assume the credentials for the glam-fenix-dev project.

Fail CI on non-parseable DAGs

We may have an issue to capture this already, but I merged in a PR today that broke a bunch of dags because of a syntax error (use of a python 3 feature.) We should at the very least be able to catch these syntax errors, and ideally run a smoke test task from the command line as part of CI

Airflow reports it `cannot import name bigquery_etl_copy_deduplicate`

WTMO is displaying two banners with the following message today (2019-08-01):

Broken DAG: [/app/pvmount/telemetry-airflow/dags/copy_deduplicate.py] cannot import name bigquery_etl_copy_deduplicate
Broken DAG: [/app/pvmount/telemetry-airflow/dags/main_summary.py] cannot import name bigquery_etl_copy_deduplicate 

Consider passing AWS credential environment variables through via docker-compose

Per this comment, it may be preferable to specify the AWS credential environment variables to be passed through in the docker-compose.yml file rather than only in that one target within the Makefile.

In the past, I've relied on the lack of credentials to prevent launching tons of EMR jobs when starting up airflow locally, though in modern times DAGs automatically start up disabled IIUC.

Update readme for creating a new dag instance

The UI has changed and we can't create a new DAG instance there. The readme should reflect this if that is indeed the case. We'll update this after the dag creation for the fx_usage_report.

main_summary.(search_dashboard | search_clients_daily) does not expose input_bucket

email=["[email protected]", "[email protected]", "[email protected]"],
env=mozetl_envvar("search_dashboard", {
"submission_date": "{{ ds_nodash }}",
"bucket": "{{ task.__class__.private_output_bucket }}",
"prefix": "harter/searchdb",
"save_mode": "overwrite"
}),

The code for the mozetl job is found here:
https://github.com/mozilla/python_mozetl/blob/0f8189f87f857f43e9c0142f9c612a0bcc28978c/mozetl/search/aggregates.py#L275-L282

The full job will get run when running the entire dag, instead of from telemetry-test-bucket.

Several broken dags due locally to missing `google_cloud_derived_datasets`

Broken DAG: [/app/dags/spinner_severity_generator.py] Unable to locate credentials
Broken DAG: [/app/dags/crash_summary.py] The conn_id `google_cloud_derived_datasets` isn't defined
Broken DAG: [/app/dags/sync_view.py] The conn_id `google_cloud_derived_datasets` isn't defined
Broken DAG: [/app/dags/mango_log_processing.py] The conn_id `google_cloud_derived_datasets` isn't defined
Broken DAG: [/app/dags/first_shutdown_summary.py] The conn_id `google_cloud_derived_datasets` isn't defined
Broken DAG: [/app/dags/landfill.py] Unable to locate credentials 

This makes testing locally on dags like main_summary difficult. There should be a dummy credential so this works correctly.

[meta] Use correct buckets for `main_summary` dependencies when running in dev/staging

I wanted to verify the main_summary dag to test #297 of making the default EMR release 5.13.0. I followed the instructions added in #295 for running the main_summary dag, but I noticed that there are quite a few jobs that don't accept the right input bucket from the environment.

  • #308 engagement_ratio
  • #309 addon_aggregates
  • #310 experiment_aggregates (?)
  • #311 search_dashboard / search_clients_daily
  • #312 clients_daily (also possibly deprecated)
  • #313 clients_daily_v6
  • #314 retention
  • #315 client_count_daily_view
  • experiments_error_aggregates

Fixing these would make this dag moderately better for testing locally for testing end-to-end changes. Some issues look like they require slightly more effort than others.

main_summary.engagement_ratio does not expose input/output bucket

engagement_ratio = EMRSparkOperator(
task_id="engagement_ratio",
job_name="Update Engagement Ratio",
execution_timeout=timedelta(hours=6),
instance_count=10,
uri="https://raw.githubusercontent.com/mozilla/telemetry-airflow/master/jobs/engagement_ratio.sh",
output_visibility="public",
dag=dag)

This job does not accept the bucket location from the environment, which breaks integration testing.
The engagement ratio job is also not exposed to the mozetl cli, and can't be run using mozetl_submit.

Document workaround for permission errors on Linux

Because of dockerflow, the container user id is 10001. The folder is generally set to have permissions for the current user.

This can be solved by replacing all instances of the container user id with the current user id.

$ sed -i "s/10001/$(id -u)/g" Dockerfile.dev 

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.