Coder Social home page Coder Social logo

ashishp98 / data-engineering-nanodegree-p6-capstone-project Goto Github PK

View Code? Open in Web Editor NEW
0.0 1.0 0.0 33.38 MB

This is the Udacity Data Engineering Nanodegree program's Capstone Project. The purpose of the data engineering capstone project is to give a chance to combine what learnings were in throughout the program and in result create a working data warehouse which can be used for analytics.

Jupyter Notebook 20.11% SAS 30.51% Python 49.38%

data-engineering-nanodegree-p6-capstone-project's Introduction

Udacity Data Engineering Nanodegree Capstone Project

Project Summary

This is the Udacity Data Engineering Nanodegree program's Capstone Project. The purpose of the data engineering capstone project is to give a chance to combine what learnings were in throughout the program.

I decided to go with the Udacity provided project which is based on I94 immigration data. This is a large data set which contains immigration data for U.S. ports. The plan is to enrich this data using the other data sources suggested and create a working data warehouse which can be used for analytics.

At a high level:

  • Data is extracted from the immigration SAS data, partitioned by year, month, and day, and stored in a data lake on Amazon S3 as Parquet files.
  • The partitioned data is loaded into Redshift into staging tables
  • The staging data is combined with other staged data sources to produce the final fact and dimension records in the Redshift warehouse.

Ideas on questions we could explore with the final data set:

  • For a given port city, how many immigrants enter from which countries?
  • What are the demographics of the port city and is there any relationship to the country of origin?
  • Is there any relationship between the average temperature of the country of origin and average temperature of the port city of entry?
  • What time of year or month sees more immigration for certain areas?

Data Source Analysis

I94 Immigration Data

This data comes from the U.S. National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from.

This data is stored as a set of SAS7BDAT files. SAS7BDAT is a database storage file created by Statistical Analysis System (SAS) software to store data. It contains binary encoded datasets used for advanced analytics, business intelligence, data management, predictive analytics, and more. The SAS7BDAT file format is the main format used to store SAS datasets.

The immigration data is partitioned into monthly SAS files. Each file is around 300 to 700 MB. The data provided represents 12 months of data for the year 2016. This is the bulk of the data used in the project.

A data dictionary I94_SAS_Labels_Descriptions.SAS was provided for the immigration data. In addition to descriptions of the various fields, the port and country codes used were listed in table format. I extracted the port codes to a file i94_ports.csv. I extracted the country codes to a file i94_countries.csv. These files were placed in the data lake to be used as a lookup when extracting the immigration data.

The SAS format can be read fairly easily with pandas in python or Apache Spark.

Read SAS data with pandas example:

import pandas as pd
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In order to load this data into Apache Spark, I added the following JAR packages to my Spark environment:

Reading SAS data with Spark:

df = spark.read.format("com.github.saurfang.sas.spark").load(input_path)

Since this dataset is fairly large and it was provided on an attached disk in a JupyterLab environment provided by Udacity, I decided to preprocess it with PySpark in that environment and load it to Amazon S3. I created a PySpark script to extract the SAS data and write to Parquet format. The Parquet data is partitioned by year, month, and day based on the arrival date of the immigrant. This breaks the monthly SAS immigration data down into easier to manage partitions which can be later backfilled through the Apache Airflow pipeline on a daily cadence.

See src/spark/extract_immigration_sas.py

World Temperature Data

This dataset came from Kaggle. You can read more about it here.

With this data set I used two sources:

  • GlobalLandTemperaturesByCity.csv (508 MB)
  • GlobalLandTemperaturesByCountry.csv (21.6 MB)

These files contain average temperature data for countries and cities between 1743-11-01 and 2013-09-01. For the city data, I only pulled cities in the United States since we are interested in immigration through U.S. ports. This temperature data would be applied to the port cities extracted from the immigration data. Since the data all fell prior to 2016, I only pulled the latest entry for each city where a temperature was recorded.

Similarly, with the country data I pulled the latest entry that had temperature data for each country. This country data would be applied to the immigration data country of origin.

Both sets of data were extracted one time using Apache Spark and placed in a data lake in Amazon S3 in Parquet format for later processing.

See src/spark/extract_immigration_sas.py

U.S. City Demographic Data

This data comes from OpenSoft. You can read more about it here.

This data contains demographic data for U.S. cities. This data placed in the data lake as a single CSV file (246 KB). This data will be combined with port city data to provide ancillary demographic info for port cities.

Airport Code Table

This is a simple table of airport codes and corresponding cities. It comes from here.

This data is a single CSV file (5.8 KB). It provides additional information for airports and can be combined with the immigration port city info.

Data Model

For this project I went with a snowflake schema. There is one fact table with the main immigration data with multiple dimension tables surrounding it. Some of the dimension tables are connected to other dimension tables. For instance, ports have demographics and optional airport information.

Data Dictionary

Data Model Entity Relationship Diagram

Data Model

Choice of Technologies

The main technologies used are:

  • Amazon S3 - I used S3 for data lake storage of the data to be processed. While we are building a data warehouse for a certain type of analysis, the data lake contains the cleaned raw data which could be used for a different type of analysis at a later time.
  • Apache Spark - I used Spark primarily to extract, clean, and partition the immigration data. Because the data was provided in a JupyterLab with attached storage and the files are so large, I decided to preprocess the data through Spark in that environment. In production we would probably add a DAG to Apache Airflow to submit a job to a Spark cluster on a monthly basis or as needed.
  • Apache Airflow - I used Apache Airflow as a tool for the primary data pipeline. The pipeline schedules and coordinates the flow of data from the S3 data lake to Amazon Redshift and performs quality checks along the way. Airflow makes it easy to set up the pipeline and make adjustments as requirements change over time.

Data Pipeline

The main data pipeline uses Apache Airflow to process immigration data for single day at a time. In brings in the immigration data from Amazon S3 and combines it with other staging data for ports, airport data, countries, city and country temperatures, and city demographics.

Airflow uses directed acyclic graphs (DAG's) to describe a pipeline workflow. Each DAG is made up of tasks which are the nodes of the graph. Each task implements an operator of some type to execute code.

Ultimately I wanted to use Amazon Redshift as the data warehouse, but in order to avoid the cost of running a Redshift cluster, I did most of the early development against a local copy of PostgreSQL.

I created four DAG's, two for Postgres and two for Amazon Redshift:

  • setup_postgres.py - This DAG is responsible for creating the schema within a Postgres database and loading some static staging data.
  • setup_redshift.py - This DAG is responsible for creating the schema within an Amazon Redshift database and loading some static staging data.

Setup Airflow DAG

  • import_i94_postgres.py - This DAG loads the immigration data into a staging table in Postgres and then combines it with other staging data to produce the final dimension and fact table entries that are inserted.
  • import_i94_redshift.py - This DAG loads the immigration data into a staging table in Redshift and then combines it with other staging data to produce the final dimension and fact table entries that are inserted.

Import I94 Airflow DAG

Custom Apache Airflow Operators

I created four custom Apache Airflow operators to use within the pipeline.

  • DataQualityOperator - This operator takes a database connection ID
    • conn_id - The Airflow connection ID for a Postgres or Redshift database
    • sql_check_query - The SQL query used as a data quality check
    • expected_results - A lambda function that acts as a predicate to test the results of the query above
  • StageToRedshiftOperator - This operator makes it easy to move data in various formats from S3 to a staging table in Redshift
    • redshift_conn_id - The Redshift Airflow connection ID
    • aws_credentials_id - The Airflow connection ID for AWS (used for access to S3)
    • table_name - The staging table name to write to
    • s3_path - The path of the S3 bucket to read data from (this field is templated)
    • copy_format - The format of the source data (CSV, AVRO, PARQUET, etc.)
    • truncate_table - If true the table is truncated prior to adding new data
  • StageCsvToPostgresOperator - Moves CSV data into Postgres using COPY statement.
    • postgres_conn_id - The Airflow connection ID for a Postgres database
    • csv_path - The path to CSV file accessible from Postgres server
    • table_name - The staging table name to load data into
    • delimiter - The delimiter of the CSV file
    • truncate_table - If true the table is truncated prior to adding new data
  • StageParquetToPostgres - Moves Parquet data to Postgres staging table. Postgres has no built-in COPY operator for Parquet so this uses python pandas to read and load the data.
    • postgres_conn_id - The Airflow connection ID for a Postgres database
    • parquet_path - The path to Parquet file accessible from Postgres server (this field is templated to allow for partitioned data load based on execution date)
    • table_name - The staging table name to load data into
    • truncate_table - If true the table is truncated prior to adding new data

Data Quality

There are two data quality checks within the pipeline.

  • staging_count_data_quality_check - This task uses a DataQualityOperator custom airflow operator. The check ensures that after staging the immigration data for a particular data from Amazon S3 to Amazon Redshift that we have records in the staging table. While it is possible that a particular day might not have any records, it would be highly unlikely and at the very least should be investigated.
  • staging_to_fact_data_quality_check - This task also uses the DataQualityOperator custom Airflow operator. This check ensures that the right amount of data was added to our immigration fact table from the staging table. It looks at the count of records for a particular day in the fact table vs. the count of records in the staging table and compares.

Failure to meet the criteria of the data quality check will result in a failure of that particular run in Airflow.

In addition to these two checks, foreign key constraints were used to ensure data integrity between the dimension tables and the fact table.

Questions

We are asked to answer how we would approach the problem differently under the following scenarios:

  • If the data was increased by 100x.
    • If Spark with standalone server mode can not process 100x data set, we could consider to put data in AWS EMR which is a distributed data cluster for processing large data sets on cloud.
  • If the pipelines were run on a daily basis by 7am.
    • Apache Airflow could be used for building up a ETL data pipeline to regularly update the date and populate a report. Apache Airflow also integrate with Python and AWS very well. More applications can be combined together to deliever more powerful task automation.
  • If the database needed to be accessed by 100+ people
    • AWS Redshift can handle up to 500 connections. If this SSOT database will be accessed by 100+ people, we can move this database to Redshift with confidence to handle this request. Cost/Benefit analysis will be needed if we are going be implement this cloud solution.

Challenges

  • Immigration data set is based at 2016 but temperature data set only get to 2013 which is not enough for us to see the temperature change at 2016.

  • Missing state and city in label description file. This makes it hard to join immigration tables and demography tables.

Example Data Usage

--Immigrants arrivals by country for month of January

SELECT c.country, COUNT(*) FROM fact_immigration i
INNER JOIN dim_countries c ON i.country_id = c.country_id
INNER JOIN dim_time t ON i.arrdate=t.sas_timestamp
WHERE t.year=2016 AND t.month=1
GROUP BY c.country
ORDER BY count DESC
LIMIT 10

Immigrants by country of origin

--Top 10 immigrants by port of entry for month of January

SELECT p.port_city, p.port_state, COUNT(*) as count
FROM fact_immigration i
INNER JOIN dim_ports p ON i.port_id = p.port_id
INNER JOIN dim_time t ON i.arrdate=t.sas_timestamp
WHERE t.year=2016 AND t.month=1
GROUP BY p.port_city, p.port_state
ORDER BY count DESC
LIMIT 10

Immigrants by port of entry

-- Arrivals by day of week

SELECT t.day_of_week,COUNT(*) as count
FROM fact_immigration i
INNER JOIN dim_ports p ON i.port_id = p.port_id
INNER JOIN dim_time t ON i.arrdate=t.sas_timestamp
WHERE t.year=2016 AND t.month=1
GROUP BY t.day_of_week
ORDER BY t.day_of_week

Immigrants by day of week

data-engineering-nanodegree-p6-capstone-project's People

Contributors

ashishp98 avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.