Coder Social home page Coder Social logo

gakas14 / serverless-data-stream-with-kinesis-data-stream-kinesis-firehouse-s3-and-snowflake Goto Github PK

View Code? Open in Web Editor NEW
0.0 1.0 1.0 116 KB

Serverless Data pipeline with Api Gateway, AWS lambda, Amazon Kinesis, S3 and Snoflake

apigateway kinesis-data-streams kinesis-firehose lambda-functions s3 snowflake sqs-queue

serverless-data-stream-with-kinesis-data-stream-kinesis-firehouse-s3-and-snowflake's Introduction

Serverless-data-stream-with-kinesis-data-stream-kinesis-firehouse-S3-and-snowflake

Kinesis_ApiGateway_Snowflake

Use Postman to make an API call; the Amazon API Gateway will trigger a Lambda function; the function will write into the s3 bucket; then, Snowpipe will start to write the data into a Snowflake. To not activate Snowpipe for every API call and optimize the process, we will use Amazon Kinesis data Stream to collect the data first (dump the data based on the buffer size or the time or if there are few messages), then use Kinesis Firehouse to leave the data into s3.

Step 1: Create the Lambda Role

Screen Shot 2024-01-02 at 12 55 54 PM

Step 2: Create the lambda function to read the data from API Gateway and put it in Kinesis Data Stream

Screen Shot 2024-01-02 at 12 57 50 PM
Screen Shot 2024-01-02 at 12 57 59 PM
		import json
		import datetime
		import random
		import boto3
		
		client = boto3.client('kinesis')
		
		def lambda_handler(event, context):
		    TODO implement
		    data = json.dumps(event['body'])  
		    client.put_record(StreamName="project3_kinesis_apigateway", Data=data, PartitionKey="1")
		    print("Data Inserted")

Step 3: Create API Gateway and make the integration with AWS Lambda created in Step 2

  • Create a simple HTTP API and integrate it with the lambda function.
Screen Shot 2024-01-02 at 1 03 12 PM
  • Configure the route: POST method.
Screen Shot 2024-01-02 at 1 07 18 PM
Screen Shot 2024-01-02 at 1 07 37 PM - Screen Shot 2024-01-02 at 1 08 08 PM - Screen Shot 2024-01-02 at 1 10 11 PM -

Get the API endpoint " https://fwpwl5qova.execute-api.us-east-1.amazonaws.com/dev/project3_lambda1_kinesis_apigateway"

Step 4: Create a Kinesis Data Stream to consume data from AWS Lambda created in Step 2

Screen Shot 2024-01-02 at 1 23 24 PM

Step 5: Create a second Lambda function for processing the data before s3 dump

Screen Shot 2024-01-02 at 1 36 27 PM

This second lambda function will transform the data( decode the data to put a delimiter in between each record) to make it easy to put on Snowflake. We will convert the binary data into string data. Then, add a \n to make a new line.

		import json
		import boto3
		import base64
		output = []
		
		def lambda_handler(event, context):
		    print(event)
		    for record in event['records']:
		        payload = base64.b64decode(record['data']).decode('utf-8')
		        print('payload:', payload)
		        
		        row_w_newline = payload + "\n"
		        print('row_w_newline type:', type(row_w_newline))
		        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
		        
		        output_record = {
		            'recordId': record['recordId'],
		            'result': 'Ok',
		            'data': row_w_newline
		        }
		        output.append(output_record)
		
		    print('Processed {} records.'.format(len(event['records'])))
		    
		    return {'records': output}

Step 6: Create a S3 bucket that will be the kinesis Firehose destination

S3: Screen Shot 2024-01-02 at 1 40 26 PM

Step 7: Create Kinesis Firehose

  • Create kinesis firehouse
Screen Shot 2024-01-02 at 1 40 45 PM
Screen Shot 2024-01-02 at 1 41 48 PM
  • Activate Transform source records with AWS Lambda.
Screen Shot 2024-01-02 at 1 43 21 PM

Step 8: Create a Snowflake role.

Screen Shot 2024-01-02 at 2 02 22 PM

Storage Integration Creation: Copy the snowflake role arm into the snowflake console and copy the s3 bucket arm role.

		create warehouse s3_to_snowflake_wh;
		use s3_to_snowflake_wh;
		--Specify the role
		use role ACCOUNTADMIN;
		
		drop database if exists s3_to_snowflake;
		
		--Database Creation 
		create database if not exists s3_to_snowflake;
		
		--Specify the active/current database for the session.
		use s3_to_snowflake;
		
		
		--Storage Integration Creation
		create or replace storage integration s3_int
		TYPE = EXTERNAL_STAGE
		STORAGE_PROVIDER = S3
		ENABLED = TRUE 
		STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::200105849428:role/project3_snwoflake_role'
		STORAGE_ALLOWED_LOCATIONS = ('s3://gakas-project3-kinesis-apigateway')
		COMMENT = 'Testing Snowflake getting refresh or not';
		
		--Describe the Integration Object
		DESC INTEGRATION  s3_int;
		
		
		--External Stage Creation
		
		create stage mystage
		  url = 's3://gakas-project3-kinesis-apigateway'
		  storage_integration = s3_int;
		
		list @mystage;
		
		--File Format Creation
		create or replace file format my_json_format
		type = json;
		
		 
		--Table Creation
		create or replace external table s3_to_snowflake.PUBLIC.Person with location = @mystage file_format ='my_json_format';
		show external tables;
		
		select * from s3_to_snowflake.public.person;
		 
		 
		--Query the table
		select parse_json(VALUE):Age as Age  , trim(parse_json(VALUE):Name,'"') as Name from  s3_to_snowflake.PUBLIC.Person;

Copy the 'STORAGE_AWS_IAM_USER_ARN' ARN and 'STORAGE_AWS_EXTERNAL_ID' from Snowflake and update the Trust Policy in the Snowflake role in IAM.

Screen Shot 2024-01-02 at 2 19 25 PM

Create an event notification for the s3 bucket.

Screen Shot 2024-01-02 at 2 26 11 PM
Screen Shot 2024-01-02 at 2 26 17 PM
Screen Shot 2024-01-02 at 2 26 26 PM

Step 9: Test the pipeline

  • Open Postman and pass a series of records {"Name": "wang", "Age":4} {"Name": "ali", "Age":32} {"Name": "li", "Age":54} {"Name": "Moctar", "Age":44} {"Name": "she", "Age":86} {"Name": "Abdoul", "Age":22} {"Name": "lie", "Age":34} {"Name": "Cheng", "Age":55} {"Name": "Karim", "Age":23} {"Name": "ram", "Age":34} {"Name": "li", "Age":23} {"Name": "she", "Age":36}

  • Check Kinesis Data Stream and Kinesis Firehose metrics: Screen Shot 2024-01-02 at 2 32 54 PM

Screen Shot 2024-01-02 at 3 27 12 PM
  • Check S3
Screen Shot 2024-01-02 at 3 27 30 PM
  • Check the data into snowflake
Screen Shot 2024-01-02 at 3 07 30 PM

serverless-data-stream-with-kinesis-data-stream-kinesis-firehouse-s3-and-snowflake's People

Contributors

gakas14 avatar

Watchers

 avatar

Forkers

gveerashekar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.