Coder Social home page Coder Social logo

Comments (6)

soumilshah1995 avatar soumilshah1995 commented on August 16, 2024 1

EMR here is guide to follow

https://youtu.be/jvbHUl9A4tQ?si=l7AdUR4vmr_5sDIq

Running Apache Hudi Delta Streamer On EMR Serverless Hands on Lab step by step guide for beginners

1

Video based guide

Steps

Step 1: Download the sample Parquet files from the links

Uplaod to S3 Folder as shown in diagram

image

Step 2: Start EMR Serverless Cluster

image
image
image

Step 3 Run Python Code to submit Job

  • Please change nd edit the varibales
try:
    import json
    import uuid
    import os
    import boto3
    from dotenv import load_dotenv

    load_dotenv(".env")
except Exception as e:
    pass

global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME

AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = os.getenv("DEV_REGION")

client = boto3.client("emr-serverless",
                      aws_access_key_id=AWS_ACCESS_KEY,
                      aws_secret_access_key=AWS_SECRET_KEY,
                      region_name=AWS_REGION_NAME)


def lambda_handler_test_emr(event, context):
    # ------------------Hudi settings ---------------------------------------------
    glue_db = "hudi_db"
    table_name = "invoice"
    op = "UPSERT"
    table_type = "COPY_ON_WRITE"

    record_key = 'invoiceid'
    precombine = "replicadmstimestamp"
    partition_feild = 'destinationstate'
    source_ordering_field = 'replicadmstimestamp'

    delta_streamer_source = 's3://XXXXXXXXXXXX/raw'
    hudi_target_path = 's3://XXXXXXXXX/hudi'

    # ---------------------------------------------------------------------------------
    #                                       EMR
    # --------------------------------------------------------------------------------
    ApplicationId = "XXXXXXXXXXXXXXX"
    ExecutionTime = 600
    ExecutionArn = "XXXXXXXXXXXXXXXXXXXXXX"
    JobName = 'delta_streamer_{}'.format(table_name)

    # --------------------------------------------------------------------------------

    spark_submit_parameters = ' --conf spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar'
    spark_submit_parameters += ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
    spark_submit_parameters += ' --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    spark_submit_parameters += ' --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    spark_submit_parameters += ' --conf spark.sql.hive.convertMetastoreParquet=false'
    spark_submit_parameters += ' --conf mapreduce.fileoutputcommitter.marksuccessfuljobs=false'
    spark_submit_parameters += ' --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
    spark_submit_parameters += ' --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer'

    arguments = [
        "--table-type", table_type,
        "--op", op,
        "--enable-sync",
        "--source-ordering-field", source_ordering_field,
        "--source-class", "org.apache.hudi.utilities.sources.ParquetDFSSource",
        "--target-table", table_name,
        "--target-base-path", hudi_target_path,
        "--payload-class", "org.apache.hudi.common.model.AWSDmsAvroPayload",
        "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
        "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(record_key),
        "--hoodie-conf", "hoodie.datasource.write.partitionpath.field={}".format(partition_feild),
        "--hoodie-conf", "hoodie.deltastreamer.source.dfs.root={}".format(delta_streamer_source),
        "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
        "--hoodie-conf", "hoodie.database.name={}".format(glue_db),
        "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
        "--hoodie-conf", "hoodie.datasource.hive_sync.table={}".format(table_name),
        "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields={}".format(partition_feild),
    ]

    response = client.start_job_run(
        applicationId=ApplicationId,
        clientToken=uuid.uuid4().__str__(),
        executionRoleArn=ExecutionArn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': "command-runner.jar",
                'entryPointArguments': arguments,
                'sparkSubmitParameters': spark_submit_parameters
            },
        },
        executionTimeoutMinutes=ExecutionTime,
        name=JobName,
    )
    print("response", end="\n")
    print(response)


lambda_handler_test_emr(context=None, event=None)

Adhoc Query

image

from hudi.

ROOBALJINDAL avatar ROOBALJINDAL commented on August 16, 2024

@soumilshah1995 thanks for replying. I know how to use streamer in emr serverless, dont need tutorial. Can you please help me regariding this particular exception?

from hudi.

soumilshah1995 avatar soumilshah1995 commented on August 16, 2024

the error indicates that Spark cannot find the Hudi data source (org.apache.hudi.DefaultSource), which typically means the required Hudi jar is not properly included or recognized by Spark please ensure you are using right jar files with right version of spark

from hudi.

ad1happy2go avatar ad1happy2go commented on August 16, 2024

@ROOBALJINDAL Also. Are you setting these spark configs. I can see you adding utilities bundle in spark.jars. Is it EMR Serverless? After spark session is initialised adding jar might not help you.

--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

from hudi.

ad1happy2go avatar ad1happy2go commented on August 16, 2024

@ROOBALJINDAL Were you able to make it work? Please let us know.

from hudi.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.