Coder Social home page Coder Social logo

datopian / aircan Goto Github PK

View Code? Open in Web Editor NEW
29.0 6.0 6.0 878 KB

💨🥫 A Data Factory system for running data processing pipelines built on AirFlow and tailored to CKAN. Includes evolution of DataPusher and Xloader for loading data to DataStore.

Home Page: https://tech.datopian.com/flows/

License: Other

Python 97.87% Makefile 2.13%
ckan-datastore ckan airflow csv google-cloud-composer dag airflow-ui

aircan's People

Contributors

anuveyatsu avatar dependabot[bot] avatar hannelita avatar l3str4nge avatar leomrocha avatar mbeilin avatar rufuspollock avatar sagargg avatar shubham-mahajan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

aircan's Issues

Create a multiple node DAG for GCP

Using the same structure of the multiple node DAG assuming local files, create a DAG that handles resources on the cloud (CSV and JSON).

Acceptance

  • DAG with multiple nodes doing the entire pipeline on GCP.

Tasks

  • Use aircan from Pypi
    - [ ] Refactor code to reuse existing nodes Moved to #60

DAG tasks:

  • 1. Upload CSV from CKAN instance to bucket Read remote CSV
  • 2. delete_datastore
  • 3. create_datastore
  • 4. Creake JSOn file on bucket
  • 5. convert_csv_to_json
  • 6. Send converted JSON file to CKAN

ckannext-aircan (connector) tasks:
- [x] 1. create endpoint to receive Airflow response after processing
- [x] 2. Handle Airflow response
- [ ] 3. If success, download processed json file from bucket

NOTE: This is not the strategy. We will send the processed JSON via API.

Screen Shot 2020-07-13 at 07 33 35

Analysis

After this long task is complete, we still need to:
- [ ] Handle errors This will be in the next milestone
- [ ] Handle absence of a response from Airflow This will be in the next milestone
- [ ] Delete remote file (create a separate DAG for that) This will be in the next milestone

/aircan_submit will take the DAG name

Acceptance

  • /aircan_submit endpoint will take the DAG name
  • Response gives dag_run id

Tasks

  • Endpoint refactor
  • GCP calls refactor
  • Endpoint returns DAG run id in its reponse
  • Update Docs

[Bug] lib imports

Description

After #8 , the directory structure of the project has changed and the import on the DAGs have broken imports.

[Refactor] DAGs

Acceptance

  • Receive a description JSON file and obtain resource info (file and fields) from there
  • Pass CKAN env info when triggering DAG
  • No duplicate tasks for the multiple DAGs we have
  • Organize DAGs in subdirectories

Tasks

Relates to #30

[epic] MVP v0.1 for new Data Load approach with AirCan

Load data into CKAN DataStore using Airflow as the runner. Replacement for DataPusher and Xloader. Clean separation of components do you can reuse what you want (e.g. don't use Airflow but use your own runner)

Acceptance

  • Upload of raw file to CKAN triggers load to datastore
  • Manual triggering via UI and API
    • API: datapusher_submit
  • Loads CSV ok (no type casting so all strings)
  • Loads XLSX ok (uses types)
  • Load google sheets
  • Load Tabular Data Resource (uses types)
  • Deploy to cloud instance (e.g. GCP cloud dataflow)
  • Continuous deployment of AirCan (done with DX)
  • Error Handling (when DAG fails, the CKAN instance must know the status)
  • Success End to end run: CKAN instance with ckannext- aircan-connector; upload a CSV file and have a DAG on GCP triggered. End up with the successfully parsed data in CKAN DataStore.
  • Failed End to end run: CKAN instance with ckannext- aircan-connector; upload a CSV file and have a DAG on GCP triggered. CKAN instance must know something went wrong.

Tasks

  • README describing how to use (README driven development for a new user)
  • Boot a repo
  • local airflow with load code for local csv converting to json #3
  • Local airflow with load code working against local (or remote) CKAN from a local (or remote) CSV #4
    • Migrate Loader library code (from xloader or https://gitlab.com/datopian/tech/data-loader-next-gen/) #9
    • Test setup and running #8 -- DONE: Ready to go after #24 and #27 were merged.
    • Integrate library into airflow i.e. create tasks etc #9 -- DONE: All tasks were done (issue closed).
    • Mocks for library #15
    • Run via UI or tests - can have hardcoded local CSV and CKAN target
  • Trigger it from the API - maybe add this to README (e.g. curl ...) with ability to configure both ckan instance and src csv file

Deploy to GCP

  • Enable Composer on GCP with access for external calls [infra]
  • Deploy Aircan code on Composer (we're doing it via PyPi and also via pasting the raw code on dependencies folder on GCP bucket; this second method is easier for development purposes only)
  • DAG that is triggered remotely by a CKAN instance and process a CSV file, converting it to a JSON and sending results back to CKAn via API

Auth and logging

  • Authentication with GCP - Aircan connector must authenticate with Composer
  • Logging - Logs are enabled on Composer and can be consumed via API. Note: There is no standard format for logging yet

CKAN integration

  • New ckanext with hooks to trigger the run automatically
  • Can manual run somehow (?)

Analysis

  • For the MVP, should we use data-loader-next [for what] [Ans: processors in airflow pipeline]? Yes, this is going to be our data processsing library
  • Is anyone using data-loader-next? Should we invest time improving its docs? (Not sure if it's necessary) We will be using the core code. Whether we keep this as its own lib is debatable but an 1-2h improving docs now is fine
  • In data-loader-next does it offer support for Tabular data resources? Yes, sort of but does not currently respect types in Table Schema as this was following xloader approach where everything a string.
    • TODO: fix table schema support so types are respected as we will need that when doing proper load
  • Will Airflow orchestrate the load? Yes
  • In Airflow design, how atomic do we make the steps? e.g. do we break out creating table in datastore from load ...? Up to us. Maybe ... but actually not crucial either way.
  • How do we do logging / reporting? See #2

[Infra] Deploy CKAN + Aircan-connector to DX

Acceptance

Tasks

Note The docs for DX are still under development and the steps here may need adjustments/clarification

Following https://gitlab.com/datopian/tech/devops/-/blob/master/dx-rfc-dx-approach.md and https://gitlab.com/datopian/tech/devops/-/issues/163, @Irio helped us to come up with the following list:

  • List the services you want to deploy -- DONE -- using this repo without datapusher
    • CKAN
    • Solr
    • PostgreSQL
  • Fork dx-helm-national-grid -- DONE -- dx-helm-ckan-aircan
  • Delete unused files in /templates
  • Get access to the Google Cloud project datopian-dx (ask Irio or Irakli) -- DONE -- got the access to datopian-dx
  • Connect kubectl with your Google Cloud credentials
gcloud auth login
gcloud container clusters get-credentials
  • Create Argo CD project
    • Install argocd cli locally -- cli_installation
    • Run argocd proj create [project name] pointing to remote argo cd -- created project aircan
  • Push dx-helm-national-grid fork to Gitlab/GitHub -- dx-helm-ckan-aircan
  • Create Argo CD application inside this new project (pointing to the Git repo) -- created app aircan-dev

Before deploying, use Terraform (repo dx-terraform) to create a new PSQL instance -- @Irio we need to add documentation about this step.

Moving data out of dags directory to data directory

As an aircan user, I want to separate the data directory from the dags directory so that import of dags will be lightweight and data can be accessed outside the code.

Acceptance

  • Data should be accessible from dags

Tasks

  • Separate the Data and Dags
  • Make data accessible in dags

Decided to delete the sample files from Dag.

[epic] v0.2 Error and Logging

Introducing a status API providing reporting on run status, error and logs.

Job Stories

When I run a DAG in AirCan I want to

  • know its status (e.g. running, success, failed)
  • (?) other info (e.g. how long its running)
  • detailed errors on failure e.g. if it failed ...
  • return results (or pointer to results) on success

so that I can report on this to users and empower them to resolve errors

  • (?) get realtime output (cf gitlab runner)
  • Get notified rather than poll for updates (push notifications rather than pull)

Acceptance

  • An API exists like /api/3/action/aircan_submit?dag_id=... that runs a DAG and returns the run ID
  • An API exists like /api/3/action/aircan_status?run_id=... which reports on status of a run e.g. PENDING | RUNNING | PAUSED | FAILED | SUCCESS and provides error information
  • When DAG fails error information including access to full logs (either via previous API or a new one)
    • Logging - Logs are enabled on Composer and can be consumed via API. Note: There is no standard format for logging yet
    • Failed End to end run test: CKAN instance with ckannext- aircan-connector; upload a CSV file and have a DAG on GCP triggered. CKAN instance must know something went wrong.

FUTURE

  • Callbacks from AirCan to CKAN so rather than polling we have live status (this would be part of having "Run/Job" objects in CKAN (this is a future item)

Tasks

  • [ ]

Analysis

Client flow

Thinking of user using a CKAN instance. A run of a DAG is triggered by the CKAN instance.

The user knows the name of the DAG they'll trigger (atm specified in an .env var; it can change later)

They'd then access the following endpoint to get the status of the DAG_run

GET http://ckan:500/api/3/action/dag_run/<dag_id>  # returns all recent runs of that DAG
GET http://ckan:500/api/3/action/dag_run/<dag_id>/<run_id>

They'd see a page with the execution dates for the dag_id

Response from Airflow:

[
    {
        "dag_id": "ckan_api_load_gcp",
        "dag_run_url": "/admin/airflow/graph?dag_id=ckan_api_load_gcp&execution_date=2020-07-09+13%3A21%3A56%2B00%3A00",
        "execution_date": "2020-07-09T13:21:56+00:00",
        "id": 64,
        "run_id": "manual__2020-07-09T13:21:56+00:00",
        "start_date": "2020-07-09T13:21:56.963772+00:00",
        "state": "success"
    },
    {
        "dag_id": "ckan_api_load_gcp",
        "dag_run_url": "/admin/airflow/graph?dag_id=ckan_api_load_gcp&execution_date=2020-07-16+13%3A07%3A02%2B00%3A00",
        "execution_date": "2020-07-16T13:07:02+00:00",
        "id": 65,
        "run_id": "manual__2020-07-16T13:07:02+00:00",
        "start_date": "2020-07-16T13:07:02.100794+00:00",
        "state": "failed"
    },
    
]

The flow we'd need
on CKAN you hit:

POST api/3/aircan_submit?dag_id=XXX&...

As the answer of this request, you must get back the run_id

What do you do with this run ID? [For now we can assume the client keeps that run id and it's up to them. Longer term we will have a "Run/Job" objects in CKAN] We'd need to persist it on a DB... Otherwise it'll be lost

Our customized response including access to GCP logs

Response:

{ airflow_response: {
        "dag_id": "ckan_api_load_gcp",
        "dag_run_url": "/admin/airflow/graph?dag_id=ckan_api_load_gcp&execution_date=2020-07-09+13%3A21%3A56%2B00%3A00",
        "execution_date": "2020-07-09T13:21:56+00:00",
        "id": 64,
        "run_id": "manual__2020-07-09T13:21:56+00:00",
        "start_date": "2020-07-09T13:21:56.963772+00:00",
        "state": "success"
    } 
 gcp_logs: {
   logs for that particular run_id
 }   
    },
    {
        "dag_id": "ckan_api_load_gcp",
        "dag_run_url": "/admin/airflow/graph?dag_id=ckan_api_load_gcp&execution_date=2020-07-16+13%3A07%3A02%2B00%3A00",
        "execution_date": "2020-07-16T13:07:02+00:00",
        "id": 65,
        "run_id": "manual__2020-07-16T13:07:02+00:00",
        "start_date": "2020-07-16T13:07:02.100794+00:00",
        "state": "failed"
    },

They'd get the result of the Airflow API for DAG status
https://airflow.apache.org/docs/stable/rest-api-ref.html
Ideally combined with GCP logs

FAQs

Callbacks [Rufus: this should be later]

Another path to consider (or support both): having an endpoint set up on airflow ready to receive a post from AirCan.

i.e. a task fails while running on a DAG. Aircan sends a notification by hitting an endpoint on CKAN.

kwargs = {
  resoure: ...
}
dagid = dag_run(**kwargs)

running = True
while(running) {
  status = dag_status(dagid)
  {
    running: ...
    
  }
}

Questions to discuss

Questions Errors Handling

  • Specify on the DAG where it fails. Return "success: False" works for the logs, but we need to trigger the Fail action on the task (not being done RN)
  • Treat all corner cases of failing tasks
  • Shall we implement retires?
    • Create a default error set that will be used both in the connector and on Aircan DAGs

Logs

  • Planning to create the job_status page. Correct? What should we see in this page besides the task_id info + logs info?
  • Obtain combined info from Airflow status API + GCloud logs when displaying task status. Sounds good?

Other questions

  • What are the endpoints (on CKAN) that will trigger the DAG? right now we have datastore_create and aircan_submit. Are there any other triggering endpoints?

  • What is the best way to organize the docs? I fing the README on aircan is extensive and potentially with lots of non-useful information. i.e. are people going to use aircan on standalone?

Use resource schema to set schema fields in data resource that is passed to Factory (and error if absent)

When user uploads a file with tabular data to portal (CKAN), we want to guess columns names properly, so it would be passed as part of payload to a DAG.

Acceptance

  • The columns names of the uploaded file determine the schema_fields_array payload parameter(currently hard coded).

Tasks

  • Add tabular data file (currently CSV) guessing columns names mechanism.

  • Populate schema_fields_array payload parameter using guessing columns names mechanism above.

  • Write tests for checking functionality.

DataStore load from local file (library + single node DAG)

Add DataStore loading via API to library and build a DAG that loads a (configurable) file to (configurable) CKAN instance (either local and/or demo.ckan) using API only.

Acceptance

  • Library that does loading to DataStore via API (only) load.py now has load_csv_via_api method (inserted on #17), and combined with create_datastore_table does the entire flow via API
  • Create DAG using datastore library api_load (leave old datastore-load-via-db-copy aside)
    • DAG can be configured with file location - ** It is possible to specify file locations for csv input and json output on the params dictionary of the DAG as shown in #18**
    • DAG can be configured with CKAN instance location (and API key) #16 (#18 improves the code) allows the config of CKAN instance location (via Airflow ENV var. CKAN_SITE_URL), as well as API Key (with CKAN_SYSADMIN_API_KEY)

Tasks

  • Refactor load.py to have an agnostic config
  • Rewrite load_csv with API approach (similar to datapusher) #17
  • DAG: Specify paths and resources via kwargs or other Operators #17
  • Handle responses from server; i.e. collect resurce_id after create and pass it to load #17
  • Trigger DAG on airflow (test trigger works) #18
  • Improve error messages and error handling for each DAG task. At this time they are very generic or do not reflect the real errors - Some addressed in #17 and #18 ; see #20 for better design porposal
  • "Housekeeping" - refactor params that are being passed (see #16 (comment)), refactor on method names, ensure test coverage (relates to #15 ), refactor and split load library files.
    Note: Tests will be moved to a separate issue; better design for passing env vars on #19

Analysis

Questions

Analysis for Logging system

Logging and reporting is a crucial aspect of a data factory system like this.

  • What kind of logs
  • Log format
  • Log storage
  • Log access

Job stories

When a Run is initiated by an Operator they want to see it is running and be notified of application and (meta)data errors as soon as possible, especially “halts” so that they can debug and re-run

If there are a lot of (data) errors I want to examine these in a system that allows me to analyse / view easily (i.e. don’t have my page crash as it tries to load 100k error messages)
I don’t want to receive 100k error emails …

When a scheduled Run happens as an Operator (Sysadmin), I want to be notified afterwards (with a report?) if something went wrong, so that I can do something about it …

When I need to report to my colleagues about the Harvesting system I want to get an overall report of how it is going, how many datasets are harvested etc so that I can tell them

Domain Model

Status info: this is Run is running, it is finished, it took this long …

  • If the process takes longer that I expect we could show a window with live logs (using the Airflow API). We haven’t yet a status like “running step X”, “running step Y”, “stopped by error”, “finished”. We need to add this to the NG Harvester.

(Raw) Log information …

  • Logs on run execution (classic INFO, WARN etc logging)
    • Including handled application errors ERROR
  • (Meta)data errors (and warnings) => What do these look like?
  • (Unhandled) Exceptions or errors (caught by parent system)

Reports / Summaries e.g. 200 records processed, 5 errors, 2 warnings, 8 new dataset, 192 existing records updated

4 cases

  • Run Status Info (Live and Historic)
    • Who: Someone running a Job in realtime: When something does not work I want to see history of jobs (e.g. when have jobs stopped running) so that I can debug
    • Provided by: Orchestrator (ie. airflow) TODO: does orchestrator provide historic info (?)
    • Format: Whatever API that gives
  • App Log
    • Who: Someone running a Job (if they want real-time feedback)
      • Someone debugging a failed job (and a specific source)
      • Someone creating a new pipeline and wanting to debug it
    • Provided by: Logging in the code using std log library and either config of the storage location in code or from orchestrator
    • Format: Regular logs (text format) and a custom JSON file as a final log report
  • (Meta)Data Quality Warn / Errors
    • Who: “Owner” of a harvest source who wants to get those corrected
      • A Harvest Admin who is overseeing the process and wants to know what happened (and maybe how to fix the pipeline)
    • Provided by: Explicit recording as part of application code and a specific error format e.g. https://github.com/frictionlessdata/data-quality-spec
    • Format: Analyze results of the quality tools to use and define some kind of JSON results or report file.
  • Report: E.g. how many runs happened. How many datasets harvested etc.
    • Who: Non-tech people more.
      • Someone Running a Job
      • Harvest Admin
    • Provided by: output from NG Harvester. Displayed via new WUI / SPA embedded in CKAN
    • Format: Basic formatting of the logs (based by JSON) file, and then iterate based on feedback

Build a DAG that loads a file to CKAN using DB connection

Build a DAG that loads a file to CKAN ( remote demo.ckan) using DB connection

Acceptance

  • Run this DAG from a local airflow instance and have a file loaded on remote CKAN

...

Tasks

delete_datastore_table <<
    create_datastore_table <<
    load_csv_to_postgres_via_copy <<
    restore_indexes_and_set_datastore_active

Analysis

Authenticate user on Airflow web interface (GCP)

We want to allow authenticated users only on the airflow GCP webserver.

Acceptance

Tasks

  • Enable basic auth
  • After #42 is done, check any other requirements for this task
  • On ckanext-aircan, establish a distinction between a local airflow instance and an instance on the cloud (GCP)
  • Obtain a GCP client from ckanext-aircan
  • Authenticate Client
  • Parameterize DAG name and invoke DAG on GCP via Airflow API

Analysis

Discuss other authentication methods rather than basic auth. By setting the security configuration, it is possible to allow only specific users, or groups of users, etc. Much stronger than basic HTTP Auth. In this version, we'll use Google Auth, which requires a token

Send parameters to a DAG on GCP

Our DAGs require parameters and it looks like Google Cloud Composer does not support them with its interface. We'll have to investigate possible ways to send parameters to our DAG (CKAN resource ID, input csv, output JSON)

Acceptance

  • Run DAG with parameters

Tasks

Setup test environment

Set up test environment (CI, formatting, test-coverage)

When I clone/use this repository, I want to be able to know the steps to take in order to install requirements and run the tests so I can contribute back to the code more easily and ensure that high code coverage is maintained during the whole life of this project.

Acceptance

Tasks

  • Get latest changes from PR #11 and build on top of that.

  • Create Makefile with commands requirements and test.

    • Have commands available.
    • Test with current virtualenv.
    • Test with new virtualenv.
  • Add .coveragerc configuration file.

  • Add .isort.cfg configuration file.

  • Add pytest.ini configuration file.

  • Add .flake8 configuration file.

  • Fix directory structure after merging #13. -- DONE: See PR #27

  • Set up GitHub Actions. -- DONE: See PR #24

  • Set up Code Climate and add .codeclimate.yml configuration file. Not doing for now.

  • Set up Travis CI and add .travis.yml configuration file. Switching to GitHub Actions instead.

Analysis

This mostly involves replicating the structure of the repo for Giftless as it follows best practices and use similar tools, including the test suite with pytest.

Blockers

Cypress testing

Acceptance

  • Test suite on Cypress

Tasks

  • Test resource_create and validate whether resource is created
  • Test whether UI have the Data Dictionary and other details in the resource
  • Test datastore is having the details of the ingested data
  • Test datastore create
  • Test datastore delete

Run ckanext-aircan with DAGs of this project

Acceptance

Assuming a local CKAN instance and a local Airflow instance:

Tasks

Endpoint for dag_run status

Acceptance

  • <DAG_ID>/dag_run endpoint on CKAN which gives a report with the status of the DAG
  • Collect Airflow default API response for a DAG (for local and remote Airflow instances)

Note: This task won't provide any integration with GCP logs (see #69 ); it only collects the output of Airflow API

Tasks

  • Create endpoint; DAG_RUN is optional. If no DAG_RUN is specified, list the most recent dag_runs
  • If a DAG run is specified, retrieve info only for that particular RUN (note: when you trigger the DAG, the response brings the DAG_RUN (see #79)
  • Return Airflow API response (local Airflow)
  • Return Airflow API response (remote Airflow). It will require another integration with GCP

Pack Aircan core lib into a Pypi package

Packing the code into a PyPI package is necessary to have Aircan running into a cloud provider (GCP).

Acceptance

  • Pypi package aircan-lib deployed and possible to install via requirements.txt
  • Local DAGs refactored to use the new package

Tasks

Analysis

Update and organize the README after recent changes

After #8 and all the changes on the new DAGs, it is necessary to update the README file to reflect the most recent status of the project.

Acceptance

  • User must successfully replicate described steps and install aircan locally
  • User must successfully run example DAG
  • User must run single_node_dag locally
  • User must run multiple_node DAG successfully
  • User must set up composer successfully with the two previous DAGs

Tasks

  • Cleanup general install instructions
  • Separate sections more clearly (setup + run example / Local aircal ( single node DAG / multiple node DAG) / remote aircan on GCP
  • Provide section for GCP

Create ckanext-aircan-connector extension

When user uploads a file with tabular data to portal (CKAN), we want an Airflow DAG to be triggered so the data is uploaded to Datastore.

Acceptance

A new ckan extension exists similar to ckanext-datapusher that does the following (TODO: check behaviour of ckanext-datapusher in some detail):

  • Can be configured with list of files to process (i.e. tabular ones) - -- DONE -- DEFAULT_FORMATS
  • On upload of a new file that matches this file list automatically trigger an import of that file via Airflow DAG into datastore
  • Provide an API for manual triggering of an import of an existing file -- DONE -- you can do it as such:
curl -X POST \
-H 'Authorization: YOUR_CKAN_API_KEY' \
-d '{"resource_id": "RESOURCE_ID"}' \
http://CKAN_HOST/api/action/datapusher_submit

with successful response:

{
    "help": "http://ckan:5000/api/3/action/help_show?name=datapusher_submit",
    "success": true,
    "result": {
        "message": "Created <DagRun ckan_api_load_multiple_steps @ 2020-07-06 07:27:15+00:00: manual__2020-07-06T07:27:15+00:00, externally triggered: True>",
        "execution_date": "2020-07-06T07:27:15+00:00",
        "run_id": "manual__2020-07-06T07:27:15+00:00"
    }
}

Tasks

  • Stub new ckanext-aircan extension -- DONE -- repo
  • Write tests for checking functionality (will need to mock aircan API calls).
    • can port material (if exists) from ckanext-datapusher logic. - Resolved by for currently implemented functionality.
  • Implement push notification mechanism to notify the loader (Airflow DAG) to upload data to DataStore, like
    notify from current version. -- DONE -- Implemented in ckanext-aircan

Migrate datastore-next-library code to this repo

Acceptance

  • previous datahub-next-loader code into lib/ folder
  • DAG file invoking loader
  • Mocks Moved to #15

Tasks

  • Copy over existing datastore loader code from https://gitlab.com/datopian/tech/data-loader-next-gen
  • Create invoking operator for each task individually and making sure they run properly
    • delete_datastore_table
    • create_datastore_table
    • load_csv_to_postgres_via_copy
  • Create mocks for existing functions See #15

Analysis

Questions

  • On load_csv_to_postgres_via_copy, we'll probably need a refactor, since it returns a generic error
except Exception as e:
                    return {
                        'success': False,
                        'message': 'Error during deleting index: {}'.format(e)
                    }
  • The original library uses .env vars. Here I'm switching to airflow vars. Pros: Easy to manage everything in airflow UI; cons: code will be tied to airflow; there is a simple way to refactor it and make it entirely agnostic \cc @rufuspollock

Local airflow with load code for local csv converting to json

local airflow with load code for local csv converting to json. Part of Epic #1

Acceptance

  • Parameterized task on examples/folder; receives a csv file and outputs a pretty printed json file

Tasks

  • Define airflow operator (PythonOperator for now)
  • Libraries - converting with pandas
  • Specify in/out directories in kwargs
  • Json Pretty Printing
  • wrap up DAG code

Analysis

Questions

  • How to deal with invalid CSV files? This may deserve another issue on the tracker
    • ~rufus definitely deserves another item. Here you assume the CSV's are "tidy/good"
  • tests? (unsure how to do them for this example case)
    • ~rufus - what are you testing? there's the core lib for processing and airflow itself. For airflow itself it would be more UAT like e.g. calling the API and checking there's some output on disk
  • Formatting scripts/libraries? Ideally set them up with tests (i.e. run make test and also get feedback from formatting)
    • ~rufus: can you clarify what you mean by formatting scripts / lib.

Logs from GCP on CKAN

Acceptance

  • In the dag_status endpoint on CKAN-aircan connector, we should receive logs/info from the remote Airflow DAG run on GCP

Tasks

[epic][uber] v1 of CKAN DataStore Load using AirCan

This the uber-epic for the complete evolution of CKAN DataStore load to AirCan.

Acceptance

  • We are using AirCan in production for data loading to datastore
    • (central?) AirCan service in our cluster
    • CKAN instances updated with connector for AirCan
    • Monitoring / Debugging working i.e. we can see what is happening and if there are issues
  • New UI for CKAN instances for data loading experience ...

Tasks

  • v0.1 MVP DataStore load working including
    • Staging environment: AirCan (Google Cloud Composer) + CKAN instance with extension #47 #66 DONE. Live at https://ckan.aircan.dev.datopian.com/ Repo with the helm chart: https://gitlab.com/datopian/tech/dx-helm-ckan-aircan
    • CI working Passing tests
    • Integration tests of DataStore Load (in cypress) e.g. we upload a file to CKAN staging instance and 5m later data is in datastore. We have the tests, but they are assuming a local npm test. Right now they are pointing to a temporary CKAN instance (not to DX) and they run the entire flow. No CI for this on github atm
    • CD of AirCan and CKAN extension into staging (includes terraform setup of GCC) Have automated deployment script - see #66 but we don't have CD such that changes to AirCan DAGs or the ckan extension get auto re-deployed.
    • BONUS: BigQuery DAG DONE. DAG is done and working. https://github.com/datopian/aircan/blob/master/aircan/dags/api_ckan_import_to_bq.py
  • v0.2 - errors and logging #65
    • Refactor DAGs and ckanext-aircan etc to take a run_id which you can pass in to the DAG and which it uses in logging etc when running it so we can reliably track logs etc. Also move airflow status info into logs (so we don't depend on AirFlow API).
      • Research how others solve this problem of getting unique run ids per DAG run in AirFlow (and how we could pass this info down into stackdriver so that we can filter logs). Goal is that we have a reliable aircan_status(run_id) function that can be turned into an API in CKAN (or elsewhere)
  • v0.3 - UI integration into CKAN #89
  • v0.4 - improved datastore load e.g. more formats
    • Loads XLSX ok (uses types)
    • Load google sheets
  • v0.5 - harvesting MVP

Plan of work (from 4 nov)

  • Test instance of CKAN + ckanext-aircan (+ AirFlow) https://ckan.aircan.dev.datopian.com/
    • Move this into the "dev/test cluster" @cuducos
  • Instance of Google Cloud Composer and a way to update DAGs there.
    • Should it be a test instance OR do we could use production (think this is OK in part because we can create new DAGs if we need so we don’t interfere with existing ones. E.g. Suppose we want to update datastore_load_dag and that is being used by production CKAN instances … Well, we can create datastore_load_dag_v2)?ANS: Use Production
    • Shut down all other Cloud Composer instances
  • Integration test for ckanext-aircan etc: start with a simple CSV. datopian/ckanext-aircan#26 🔥
    • with some large files [automatedly generate them] e.g. Does AirFlow DAG have an issue, is it very slow …

FUTURE after this

graph TD

v1[v0.1 CSV load working, CI/CD setup with rich tests]
v2[v0.2 errors, logging and UI integration]
v3[UI integration]
v3[v0.3 expand the tasks and  e.g. xlsx, google sheets loading]
v4[v0.4 harvesting ...]


v1 --> v2
v2 --> v3
v3 --> v4

Detailed

graph TD

deploytotest[Deploy DAGs to test GCC]
deploydags[Deploy DAGs into this AirFlow<br/>starting with CKAN data load]
deploygcc[Deploy Airflow<br/>i.e. Google Cloud Composer]

nhsdag[NHS DAG for loading to bigquery]
nhs[NHS Done: instance updated<br/>with extension and working in production]

logging[Logging]
reporting[Reporting]

othersite["Other Site Done"]

start[Start] --> deploygcc

start --> logging
multinodedag --> deploytotest

subgraph General Dev of AirCan
  errors[Error Handling]
  aircanlib[AirCan lib refactoring]
  multinodedag[Multi Node DAG]
  logging --> reporting
end

subgraph Deploy into Datopian Cluster
  deploytotest[Deploy DAGs to test GCC] --> deploydags
  deploygcc --> deploydags
end

subgraph CKAN Integration
  setschema[Set Schema from Resource]
  endckan[End CKAN work]
  setschema --> endckan
end

deploydags --> nhsdag
deploydags --> othersite
endckan --> nhs

subgraph NHS
  nhsdag --> nhs
end

classDef done fill:#21bf73,stroke:#333,stroke-width:1px;
classDef nearlydone fill:lightgreen,stroke:#333,stroke-width:1px;
classDef inprogress fill:orange,stroke:#333,stroke-width:1px;
classDef next fill:lightblue,stroke:#333,stroke-width:1px;

class multinodedag done;
class versioning nearlydone;
class setschema,errors,deploydags,nhsdag,deploygcc inprogress;

Resolve Schema field issue

With the new schema changes (https://github.com/datopian/ckanext-aircan/blob/0ba67147cc0fccfca5e32d5ff16ac16f49541ca9/ckanext/aircan_connector/action.py#L28)
the following error is coming.

Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/__init__.py", line 1491, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 112, in execut
    return_value = self.execute_callable(
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 117, in execute_callabl
    return self.python_callable(*self.op_args, **self.op_kwargs
  File "/home/airflow/gcs/dags/aircan/dags/api_ckan_load_gcp.py", line 77, in task_create_datastore_tabl
    schema = eval_schema.get('fields'
AttributeError: 'str' object has no attribute 'get

DAG for NHS

Acceptance

  • DAG for NHS deployed on GCP Airflow

Tasks

Local airflow with load code working against CKAN - API DAG

Build a DAG that loads a file to CKAN ( remote demo.ckan) using API only

Acceptance

  • Run this DAG from a local airflow instance and have a file loaded on remote CKAN

Tasks

Analysis

  • Can we use Airflow's SimpleHttpOperator/HttpSensor ? We may be able to do the integration purely via CKAN API
    • ~Rufus: not sure i understand? You are writing via the API but source data is one disk right ...

Deploy to GCP

Do a trial deployment of aircan to GCP cloud composer

Acceptance

  • Running on GCP cloud composer
  • [ ] Successful ingest to e.g. demo.ckan.org via that setup

Tasks

  • Obtain access to GCP Composer
  • Investigate GCP composer and get e.g. UI and CLI working - document learnings and specify adjustments
  • Deploy aircan
    • Pack Aircan core lib into a Pypi library See #29
    • Deploy single node DAG #31
    • Create a DAG that handles the remote flow #30
  • Update README (see #37 )

Extras

  • Discuss Auth options (Airflow Auth). Start with basic auth from airflow #32

Refactor - DAG Payload on Aircan Connector

Acceptance

  • Using CKAN API, upload a resource and pass enough information to trigger the DAG with:
"conf": {
  resource: {  // a f11s data resource object
    "path": ckan_resource['url'],
    "format": ckan_resource['format'],
    "ckan_resource_id": ckan_resource['id'],
    "schema": ckan_resource['schema']
  }
  "ckan_config": {
    "api_key": ..
  }
}

~Note: Airflow does not support nested JSON; we'll have to send the info in a one-level json ~
Note2: When using JINJA templates, it doesn't. When fetching params from **context, it works, so we're adopting the original proposal

"conf": {
    "path": ckan_resource['url'],
    "format": ckan_resource['format'],
    "ckan_resource_id": ckan_resource['id'],
    "schema": ckan_resource['schema']
    "ckan_api_key": ..
}

Tasks

  • Refactor gcp DAG to receive **context as it's only arg
  • Update action.py on Aircan connector
  • Update README
  • Update schema key to receive a f11ys See #72

Bug on creating JSON file on Airflow

If the JSON file already exists, the create action fails, making the DAG fail. Create unique names (based on timestamps) on the JSON creation task

Update schema key to receive a f11s json

Acceptance

  • A request must send a f11s object on the schema field

On the body of the request, send

"schema": {
    "fields": [
          {
            "name": "FID",
            "type": "integer",
            "format": "default"
          },
          {
            "name": "Mkt-RF",
            "type": "number",
            "format": "default"
          }
     ]
  }

Tasks

  • Core lib refactor to process the f11s obj instead of an array with the names of the fields
  • Update docs
  • Fix tests

Create classes for Errors and Response

Acceptance

  • Aircan lib code to be wrapped into a class file
  • Create default interface for errors
  • Create default interface for response

Tasks

At this time our internal libraries handle errors and responses this way, for example return {"success": False, "response": response.json()}.

Ideally, we should create a proper class for exceptions, errors handling and responses, unifying the way the system behaves (in library level)

Mocks for library

When I work on AirCan as a developer, I want to be able to run the test suite without making network requests and using services that can be mocked instead so that I can test more effectively without depending on external resources.

Acceptance

  • All the Python functions in lib/load.py are mocked in the test suite and can be run independently from external resources.

Tasks

  • Mock delete_datastore_table.
  • Mock create_datastore_table.
  • Mock load_csv_to_postgres_via_copy.
  • Mock delete_index.
  • Mock restore_indexes_and_set_datastore_active.
  • Mock _generate_index_name.

Analysis

  • See if mocking lib/api_ckan_load.py and examples/csv_to_json.py is also useful for now.
  • The functions in load.py are using the CKAN API. Comment present:

    This should work over API (not inside CKAN process) or do direct

  • This may be useful to mock lib/api_ckan_load.py: https://github.com/ckan/ckan/tree/master/ckan/tests

make test command returning an error

acceptance

  • make test should run clean and all tests related to API integration should pass.

Tasks and description

After the refactor from lib to dependencies, some tests have imports outdated .

PR #51 adds some fixes; we still need to

  • Add instruction on READme about make test
  • fix api-related tests; I believe it's missing an .env file

Manual triggering of an import of an existing file via API in GCP - result is null

When a user manually triggering an import of an existing file via API in GCP the result is always null.
Sample API call:

curl -X POST \
-H 'Authorization: YOUR_CKAN_API_KEY' \
-d '{"resource_id": "RESOURCE_ID"}' \
http://CKAN_HOST/api/action/datapusher_submit

In case we are running in GCP env (CKAN__AIRFLOW__CLOUD=GCP), the DAG is triggered and
the response is:

{
    "help": "http://ckan:5000/api/3/action/help_show?name=datapusher_submit",
    "success": true,
    "result": null
}

On the other hand, running the same API call in local env (CKAN__AIRFLOW__CLOUD=LOCAL),
we are getting a proper response:

{
    "help": "http://ckan:5000/api/3/action/help_show?name=datapusher_submit",
    "success": true,
    "result": {
        "message": "Created <DagRun ckan_api_load_multiple_steps @ 2020-07-06 07:27:15+00:00: manual__2020-07-06T07:27:15+00:00, externally triggered: True>",
        "execution_date": "2020-07-06T07:27:15+00:00",
        "run_id": "manual__2020-07-06T07:27:15+00:00"
    }
}

Acceptance

  • Manually triggering an import of an existing file via API in GCP and in Local envs both bring a proper response.

Tasks

  • Check the returned value from function invoke_gcp

Better way to send resource fields information

At this time they are sent via array in the DAG params (e.g. "schema_fields_array": "['field1', 'field2']") and everything is treated as text type.
What are good alternatives?
Questions:

  • Modify this array to become a dictionary of names and types. What is a good way to pass it? I believe doing it in plain text can be tedious. What about adding another node on the DAG to fetch the header of the CSV and automatically create a dictionary of fields, hard-coding everything to text?
  • Should we start considering different types?
  • If yes, define how to treat errors

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.