mozilla / telemetry-airflow Goto Github PK
View Code? Open in Web Editor NEWAirflow configuration for Telemetry
Home Page: https://workflow.telemetry.mozilla.org/
License: Mozilla Public License 2.0
Airflow configuration for Telemetry
Home Page: https://workflow.telemetry.mozilla.org/
License: Mozilla Public License 2.0
In a moment of clarity, I realized that the DAGs are defined in python files. We should be using better variable names (e.g. not t1, t2, t3) and should take a look at formatting these definitions better.
For example, in the main_summary.py DAG t1
should be renamed to main_summary
. Then we can say things like: search_daily_rollups.set_upstream(main_summary)
instead of t5.set_upstream(t1)
.
telemetry-airflow/dags/main_summary.py
Lines 199 to 206 in 1d2cc3b
See the option in mozetl:
https://github.com/mozilla/python_mozetl/blob/master/mozetl/clientsdaily/rollup.py#L148
This will run the full job if the dag is run from a local dev instance.
This breaks bash jobs if they have optional arguments from the command line.
Unfinished PR from Victor #1117
telemetry-airflow/plugins/s3fs_check_success.py
Lines 9 to 19 in 9c14df6
This fails when there is a __HIVE_DEFAULT_PARTITION__
in the dataset.
$aws s3 ls s3://telemetry-parquet/main_summary/v4/submission_date_s3=20190105/ --recursive | grep _SUCCESS | wc -l
101
This should be modified so the number of partitions is the minimum required _SUCCESS
partitions to be written, or an explicit list of partitions.
telemetry-airflow/dags/main_summary.py
Lines 155 to 157 in 1d2cc3b
@robhudson is this job safe to run with a different bucket as a source i.e. telemetry-test-bucket
on a local airflow instance? Or will it fail because the dev airflow doesn't have the right keys?
This line could cause the s3 sensor to incorrectly fail on datasets where the number of physical partitions exceeds 1000. In particular, this would affect main_summary_all_histograms
. To check for the number of partitions:
aws s3 ls s3://BUCKET/PREFIX/submission_date_s3=PART_ID --recursive | wc -l
See mozilla/python_mozetl#316.
There primary pain point in maintaining a mono-repo for the python spark jobs is dependency management. There is a new mozetl-databricks
script in mozetl that can run compatible repo's using a convention of passing arguments via the environment. As long as a module is consistent with existing conventions, it should be straightforward to run on the moz_databricks
plugin.
This is the path of least resistance for scheduling https://github.com/wcbeard/bgbb_airflow, which includes some python3-only dependencies.
The boto3 object causes the MozDatabrickOperator to fail during dag construction in the dev environment if the corresponding environment variables are not defined:
telemetry-airflow/plugins/moz_databricks.py
Line 177 in b8a4407
There is now a tmp
dataset for temporary tables used in intermediate operations. We should utilize that dataset for the export.
When running a task via EMRSparkOperator
(for example core_client_count_view
or devtools_prerelease_events_to_amplitude
) you get a warning like this:
[2019-05-26 01:35:53,442] {logging_mixin.py:95} WARNING - /usr/local/lib/python2.7/site-packages/airflow/models.py:1674: DeprecationWarning: BaseOperator.post_execute() now takes two arguments,
contextand
result, but "core_client_count_view" only expected one. This behavior is deprecated and will be removed in a future version of Airflow. category=DeprecationWarning)
Looks like we need to update the post_execute
method to take an additional argument.
The UI has changed and we can't create a new DAG instance there. The readme should reflect this if that is indeed the case. We'll update this after the dag creation for the fx_usage_report.
Please add documentation about how to use tbv_envvar
and telemetry_batch_view.py
.
telemetry-airflow/dags/main_summary.py
Lines 56 to 63 in 1d2cc3b
This job does not accept the bucket location from the environment, which breaks integration testing.
The engagement ratio job is also not exposed to the mozetl cli, and can't be run using mozetl_submit
.
There's interest in developing some stronger safeguards around pipelines that feed publicly released data. Pipeline code generally does not live in this repository, but this repo does serve as a reasonable place for gatekeeping, since code generally has to be scheduled to run here.
To better ensure discipline that we are thinking through implications of making changes to published data, we should consider the following steps:
public_data
) that includes commentary about ensuring data review, etc.telemetry-airflow/dags/main_summary.py
Lines 98 to 101 in 1d2cc3b
The default bucket is set to telemetry-parquet, which breaks integration testing on a small sample locally.
The following environment variable should be set: AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: "False"
The create_dataproc_cluster
task is failing at some point after dataproc_init.sh
is called. You can see the requirements getting installed in the logs and then nothing after that:
DAG ID Task ID Execution date Try number
----------------------------------- ----------------------- ------------------------- ------------
socorro_import.crash_report_parquet create_dataproc_cluster 2020-12-06 00:00:00+00:00 4
socorro_import.crash_report_parquet run_dataproc_pyspark 2020-12-06 00:00:00+00:00 1
...
[2020-12-07 02:20:58,017] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=socorro_import, task_id=crash_report_parquet, execution_date=20201206T000000, start_date=20201207T022052, end_date=20201207T022058
$ gsutil cat gs://moz-fx-data-prod-dataproc-scratch/google-cloud-dataproc-metainfo/21a4a832-667d-4b80-85c2-72d4076d6736/socorro-import-dataproc-cluster-m/dataproc-initialization-script-0_output
...
Requirement already satisfied: python-dateutil in /opt/conda/anaconda/lib/python3.6/site-packages (from arrow==0.10.0->-r /tmp/requirements.txt (line 1)) (2.8.1)
Requirement already satisfied: pytz>=2011k in /opt/conda/anaconda/lib/python3.6/site-packages (from pandas==0.23.4->-r /tmp/requirements.txt (line 8)) (2020.1)
Requirement already satisfied: setuptools in /opt/conda/anaconda/lib/python3.6/site-packages (from protobuf==3.6.1->-r /tmp/requirements.txt (line 15)) (50.3.0.post20201103)
Requirement already satisfied: py4j==0.10.7 in /opt/conda/anaconda/lib/python3.6/site-packages (from -r /tmp/requirements.txt (line 16)) (0.10.7)
Requirement already satisfied: python-dateutil in /opt/conda/anaconda/lib/python3.6/site-packages (from arrow==0.10.0->-r /tmp/requirements.txt (line 1)) (2.8.1)
Requirement already satisfied: urllib3<1.26,>=1.25.4 in /opt/conda/anaconda/lib/python3.6/site-packages (from botocore->-r /tmp/requirements.txt (line 4)) (1.25.11)
Requirement already satisfied: click==6.7 in /opt/conda/anaconda/lib/python3.6/site-packages (from -r /tmp/requirements.txt (line 5)) (6.7)
Requirement already satisfied: python-dateutil in /opt/conda/anaconda/lib/python3.6/site-packages (from arrow==0.10.0->-r /tmp/requirements.txt (line 1)) (2.8.1)
Requirement already satisfied: pytz>=2011k in /opt/conda/anaconda/lib/python3.6/site-packages (from pandas==0.23.4->-r /tmp/requirements.txt (line 8)) (2020.1)
Requirement already satisfied: setuptools in /opt/conda/anaconda/lib/python3.6/site-packages (from protobuf==3.6.1->-r /tmp/requirements.txt (line 15)) (50.3.0.post20201103)
Requirement already satisfied: py4j==0.10.7 in /opt/conda/anaconda/lib/python3.6/site-packages (from -r /tmp/requirements.txt (line 16)) (0.10.7)
It's been failing for 4 days now.
Per this comment, it may be preferable to specify the AWS credential environment variables to be passed through in the docker-compose.yml
file rather than only in that one target within the Makefile
.
In the past, I've relied on the lack of credentials to prevent launching tons of EMR jobs when starting up airflow locally, though in modern times DAGs automatically start up disabled IIUC.
That function is used in one place, so it should live in it's own script. That script should be called by bin/run
.
As of January 1 2019, Mozilla requires that all GitHub projects include this CODE_OF_CONDUCT.md file in the project root. The file has two parts:
If you have any questions about this file, or Code of Conduct policies and procedures, please see Mozilla-GitHub-Standards or email [email protected].
(Message COC001)
My dashboard that visualizes the data from the tab_spinner job hasn't gotten new data since January 27th. Any idea what's going on?
telemetry-airflow/dags/main_summary.py
Lines 246 to 247 in 1d2cc3b
The full job will be run when the full main_summary dag is enabled. This can be fixed by changing the --files
parameter to point to the right bucket.
telemetry-airflow/dags/main_summary.py
Lines 231 to 235 in 1d2cc3b
https://github.com/mozilla/python_mozetl/blob/master/mozetl/engagement/retention/job.py#L148
The job will run from the production bucket instead of the test bucket when running the entire dag locally.
Broken DAG: [/app/dags/spinner_severity_generator.py] Unable to locate credentials
Broken DAG: [/app/dags/crash_summary.py] The conn_id `google_cloud_derived_datasets` isn't defined
Broken DAG: [/app/dags/sync_view.py] The conn_id `google_cloud_derived_datasets` isn't defined
Broken DAG: [/app/dags/mango_log_processing.py] The conn_id `google_cloud_derived_datasets` isn't defined
Broken DAG: [/app/dags/first_shutdown_summary.py] The conn_id `google_cloud_derived_datasets` isn't defined
Broken DAG: [/app/dags/landfill.py] Unable to locate credentials
This makes testing locally on dags like main_summary difficult. There should be a dummy credential so this works correctly.
We may have an issue to capture this already, but I merged in a PR today that broke a bunch of dags because of a syntax error (use of a python 3 feature.) We should at the very least be able to catch these syntax errors, and ideally run a smoke test task from the command line as part of CI
WTMO is displaying two banners with the following message today (2019-08-01):
Broken DAG: [/app/pvmount/telemetry-airflow/dags/copy_deduplicate.py] cannot import name bigquery_etl_copy_deduplicate
Broken DAG: [/app/pvmount/telemetry-airflow/dags/main_summary.py] cannot import name bigquery_etl_copy_deduplicate
telemetry-airflow/dags/main_summary.py
Lines 166 to 172 in 1d2cc3b
The code for the mozetl job is found here:
https://github.com/mozilla/python_mozetl/blob/0f8189f87f857f43e9c0142f9c612a0bcc28978c/mozetl/search/aggregates.py#L275-L282
The full job will get run when running the entire dag, instead of from telemetry-test-bucket
.
Because of dockerflow, the container user id is 10001. The folder is generally set to have permissions for the current user.
This can be solved by replacing all instances of the container user id with the current user id.
$ sed -i "s/10001/$(id -u)/g" Dockerfile.dev
The make run
command does not work properly because the mysql database takes too long to respond. The wait script does not check whether the database is accepting requests, and airflow is able to make queries before it is ready.
The PodOperator is great because it allows us to use specific environments for different jobs. The idea here is for us to store Airflow Job Dockerfiles in telemetry-airflow
. These will be built alongside the CI. We can optionally build them in CI nightly, or as an Airflow step, if we need that.
These images will live in /images
, which does not need to be included in the telemetry-airflow:latest
docker image.
Each image will have it's own directory. The image will be pushed to the telemetry-airflow
repository in Dockerhub, but with the tag $DIRNAME
. So for example:
/images
/mozilla-schema-generator
- Dockerfile
- docker-compose.yml
- requirements.txt
That image would be built and pushed to telemetry-airflow:mozilla-schema-generator
. The PodOperator would use that tag to run the associated task.
cc @whd, @haroldwoo for feedback
Logs:
[2020-03-10 23:15:39,120] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: prio_processor.bootstrap.create_gke_cluster 2020-02-09T17:29:33.628418+00:00 [running]> airflow-prod-airflow-app-1-worker-0.airflow-prod-airflow-app-1-worker.prod-airflow.svc.cluster.local
[2020-03-10 23:15:39,152] {taskinstance.py:1088} ERROR - 'dict' object has no attribute 'name'
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 955, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/gcp_container_operator.py", line 177, in execute
self._check_input()
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/gcp_container_operator.py", line 168, in _check_input
elif self.body.name and self.body.initial_node_count:
AttributeError: 'dict' object has no attribute 'name'
The underlying airflow code has changed, causing failures in starting a cluster.
I've been looking for examples on how to use Airflow in practice and this repository is the best I've found. Thanks a lot!
I'm using this as the basis of a data pipeline for OpenTrials (https://github.com/opentrials/opentrials-airflow), as it's very well written and organised. As it's created by Mozilla and on GitHub, I'm assuming it's FLOSS. However, you don't specify a license, so I don't know under which terms. Could you add one?
Thanks again for the great work ๐
We would like the initialized in bin/run
to be defaulted with some values, rather than totally empty (for specifically the JSON key field).
cc @acmiyaguchi
Unfinished PR from Victor: #1137
Our general dockerized components arch looks like this:
For jobs that run dockerized components, we often merge a change then rerun the job. However the build usually isn't deployed at that point. We should have an operator that checks for a build step and, if it's happening, waits for it before running the job.
telemetry-airflow/dags/main_summary.py
Lines 217 to 220 in 1d2cc3b
The job will run from the telemetry-parquet
instead of telemetry-test-bucket
when run locally.
This will enable the cache to persist between runs.
[2020-10-23 01:13:41,522] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,521] {pod_launcher.py:125} INFO - b'gcp:telemetry.event was not found in location US\n'
[2020-10-23 01:13:41,680] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,680] {pod_launcher.py:125} INFO - b'Traceback (most recent call last):\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' return _run_code(code, main_globals, None,\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' exec(code, run_globals)\n'
[2020-10-23 01:13:41,681] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,681] {pod_launcher.py:125} INFO - b' File "/app/bigquery_etl/run_query.py", line 107, in \n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' main()\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' File "/app/bigquery_etl/run_query.py", line 97, in main\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' run(\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' File "/app/bigquery_etl/run_query.py", line 91, in run\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' subprocess.check_call(["bq"] + query_arguments, stdin=query_stream)\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' File "/usr/local/lib/python3.8/subprocess.py", line 364, in check_call\n'
[2020-10-23 01:13:41,682] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,682] {pod_launcher.py:125} INFO - b' raise CalledProcessError(retcode, cmd)\n'
[2020-10-23 01:13:41,683] {logging_mixin.py:112} INFO - [2020-10-23 01:13:41,683] {pod_launcher.py:125} INFO - b"subprocess.CalledProcessError: Command '['bq', 'query', '--project_id=dataset', '--parameter=submission_date:DATE:2020-10-22', '--schema_update_option=ALLOW_FIELD_ADDITION', '--dataset_id=telemetry_derived', '--destination_table=event_events_v1$20201022']' returned non-zero exit status 2.\n"
[2020-10-23 01:13:42,732] {logging_mixin.py:112} INFO - [2020-10-23 01:13:42,731] {pod_launcher.py:142} INFO - Event: event-events-66c5f787 had an event of type Failed
[2020-10-23 01:13:42,783] {taskinstance.py:1145} ERROR - Pod Launching failed: Pod returned a failure: failed
Traceback (most recent call last):
File "/app/dags/operators/backport/kubernetes_pod_operator_1_10_7.py", line 268, in execute
'Pod returned a failure: {state}'.format(state=final_state)
airflow.exceptions.AirflowException: Pod returned a failure: failed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/app/dags/operators/gcp_container_operator.py", line 98, in execute
result = super(UpstreamGKEPodOperator, self).execute(context) # Moz specific
File "/app/dags/operators/backport/kubernetes_pod_operator_1_10_7.py", line 273, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod returned a failure: failed
[2020-10-23 01:13:42,785] {taskinstance.py:1187} INFO - All retries failed; marking task as FAILED.dag_id=copy_deduplicate, task_id=event_events, execution_date=20201022T010000, start_date=20201023T011333, end_date=20201023T011342
I wanted to verify the main_summary
dag to test #297 of making the default EMR release 5.13.0. I followed the instructions added in #295 for running the main_summary
dag, but I noticed that there are quite a few jobs that don't accept the right input bucket from the environment.
Fixing these would make this dag moderately better for testing locally for testing end-to-end changes. Some issues look like they require slightly more effort than others.
telemetry-airflow/dags/utils/gke.py
Lines 23 to 26 in 6606e78
This setting in cluster creation breaks local development workflow. The cluster will be created, but subsequent uses of the PodOperator will not be able to authenticate against the master node to run commands on a local machine.
dataproc_init.sh
runs pip 20.3.1 with the new resolver and the requirements that it's trying to install don't gel well with one another causing the resolver to take infinite time to figure out a set of packages that meet the requirements.
#1198 covers changing dataproc_init.sh to use pip<20.3.0
which puts us back on the old resolver.
This issue covers fixing our requirements so that they don't cause the infinite-time-to-resolve problem.
I noticed this while working on adding the missioncontrol-etl job (#840), but apparently this happened with the probe scraper as well.
tl;dr: a job must print something to standard out / error every 30 seconds or so, or else it will fail with a mysterious error saying IncompleteRead
:
https://issues.apache.org/jira/browse/AIRFLOW-3534
I'm not sure if there's an easy / good workaround here. The function that's causing the problem is called read_namespaced_pod_log, which (AFAICT) is using a persistently opened http connection in Kubernetes to read the log under the hood:
I did some spelunking in the kubernetes python repository + issue tracker, and to be honest it doesn't seem like this type of use case is really taken into account with the API. There is no way to pick up the logs again in the event of a timeout or similiar, see for example this issue comment:
kubernetes-client/python#199 (comment)
The workaround is just to not get the logs and rely on stackdriver logging. This is pretty non-ideal: it increases the amount of filtering/spelunking you would need to do pretty significantly in the case that something goes wrong. Filing this issue for internal visibility, as it's a pretty serious gotcha.
telemetry-airflow/dags/utils/status.py
Lines 4 to 7 in a74f4e7
This function should register a data-set, regardless of it's execution status.
See #982 (comment)
We have multiple places now where we set a group of options together in order to ensure bigquery_etl_query replaces a whole table rather than just a partition. It's becoming a common-enough pattern that it should have direct support.
In order to avoid things like #144 we can have an automated test run each dag as a python script and fail if there are any errors.
telemetry-airflow/dags/glam_org_mozilla_fenix.py
Lines 34 to 42 in 7f0f9e6
[2020-02-28 22:47:03,770] {taskinstance.py:1088} ERROR - Command '['gcloud', 'container', 'clusters', 'get-credentials', u'bq-load-gke-1', '--zone', u'us-central1-a', '--project', u'fenix-glam-dev']' returned non-zero exit status 1
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 955, in _run_raw_task
result = task_copy.execute(context=context)
File "/app/pvmount/telemetry-airflow/dags/operators/gcp_container_operator.py", line 89, in execute
"--project", self.project_id])
File "/usr/local/lib/python2.7/subprocess.py", line 190, in check_call
raise CalledProcessError(retcode, cmd)
CalledProcessError: Command '['gcloud', 'container', 'clusters', 'get-credentials', u'bq-load-gke-1', '--zone', u'us-central1-a', '--project', u'fenix-glam-dev']' returned non-zero exit status 1
The gcp_conn_id may not have access to run in the cluster. However, I would like the container to assume the credentials for the glam-fenix-dev
project.
This is preventing churn_v2
from being run on Databricks due to the way that it's being deployed. These options are also useful for being able to run jobs that follow the mozetl convention but are not located in mozilla/python_mozetl
.
Test issue
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.