Comments (6)
EMR here is guide to follow
https://youtu.be/jvbHUl9A4tQ?si=l7AdUR4vmr_5sDIq
Running Apache Hudi Delta Streamer On EMR Serverless Hands on Lab step by step guide for beginners
Video based guide
Steps
Step 1: Download the sample Parquet files from the links
Uplaod to S3 Folder as shown in diagram
Step 2: Start EMR Serverless Cluster
Step 3 Run Python Code to submit Job
- Please change nd edit the varibales
try:
import json
import uuid
import os
import boto3
from dotenv import load_dotenv
load_dotenv(".env")
except Exception as e:
pass
global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME
AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = os.getenv("DEV_REGION")
client = boto3.client("emr-serverless",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION_NAME)
def lambda_handler_test_emr(event, context):
# ------------------Hudi settings ---------------------------------------------
glue_db = "hudi_db"
table_name = "invoice"
op = "UPSERT"
table_type = "COPY_ON_WRITE"
record_key = 'invoiceid'
precombine = "replicadmstimestamp"
partition_feild = 'destinationstate'
source_ordering_field = 'replicadmstimestamp'
delta_streamer_source = 's3://XXXXXXXXXXXX/raw'
hudi_target_path = 's3://XXXXXXXXX/hudi'
# ---------------------------------------------------------------------------------
# EMR
# --------------------------------------------------------------------------------
ApplicationId = "XXXXXXXXXXXXXXX"
ExecutionTime = 600
ExecutionArn = "XXXXXXXXXXXXXXXXXXXXXX"
JobName = 'delta_streamer_{}'.format(table_name)
# --------------------------------------------------------------------------------
spark_submit_parameters = ' --conf spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar'
spark_submit_parameters += ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
spark_submit_parameters += ' --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
spark_submit_parameters += ' --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
spark_submit_parameters += ' --conf spark.sql.hive.convertMetastoreParquet=false'
spark_submit_parameters += ' --conf mapreduce.fileoutputcommitter.marksuccessfuljobs=false'
spark_submit_parameters += ' --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
spark_submit_parameters += ' --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer'
arguments = [
"--table-type", table_type,
"--op", op,
"--enable-sync",
"--source-ordering-field", source_ordering_field,
"--source-class", "org.apache.hudi.utilities.sources.ParquetDFSSource",
"--target-table", table_name,
"--target-base-path", hudi_target_path,
"--payload-class", "org.apache.hudi.common.model.AWSDmsAvroPayload",
"--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
"--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(record_key),
"--hoodie-conf", "hoodie.datasource.write.partitionpath.field={}".format(partition_feild),
"--hoodie-conf", "hoodie.deltastreamer.source.dfs.root={}".format(delta_streamer_source),
"--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
"--hoodie-conf", "hoodie.database.name={}".format(glue_db),
"--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
"--hoodie-conf", "hoodie.datasource.hive_sync.table={}".format(table_name),
"--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields={}".format(partition_feild),
]
response = client.start_job_run(
applicationId=ApplicationId,
clientToken=uuid.uuid4().__str__(),
executionRoleArn=ExecutionArn,
jobDriver={
'sparkSubmit': {
'entryPoint': "command-runner.jar",
'entryPointArguments': arguments,
'sparkSubmitParameters': spark_submit_parameters
},
},
executionTimeoutMinutes=ExecutionTime,
name=JobName,
)
print("response", end="\n")
print(response)
lambda_handler_test_emr(context=None, event=None)
Adhoc Query
from hudi.
@soumilshah1995 thanks for replying. I know how to use streamer in emr serverless, dont need tutorial. Can you please help me regariding this particular exception?
from hudi.
the error indicates that Spark cannot find the Hudi data source (org.apache.hudi.DefaultSource), which typically means the required Hudi jar is not properly included or recognized by Spark please ensure you are using right jar files with right version of spark
from hudi.
@ROOBALJINDAL Also. Are you setting these spark configs. I can see you adding utilities bundle in spark.jars. Is it EMR Serverless? After spark session is initialised adding jar might not help you.
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
from hudi.
@ROOBALJINDAL Were you able to make it work? Please let us know.
from hudi.
Related Issues (20)
- [SUPPORT] Unable to Use DynamoDB Based Lock with Hudi PySpark Job Locally HOT 8
- [SUPPORT] Serde properties missing after migrate from hivesync to gluesync HOT 4
- [SUPPORT] HOT 1
- [SUPPORT] The clean service can't clean historical version files after the savepoint instant when i set `hoodie.archive.beyond.savepoint=true` HOT 1
- [SUPPORT] Multi Writer DeltaStreamer (W1 and W2) Writing into Partition IN and US One of them failing
- [SUPPORT] Data deduplication caused by drawback in the delete invalid files before commit HOT 9
- [SUPPORT] hoodie.datasource.write.precombine.field is invalid HOT 3
- [SUPPORT] HOT 5
- [SUPPORT] hoodie.cleaner.commits.retained Setting Overridden, Warning to Increase to 20 HOT 3
- [SUPPORT] select lots of values via Record Index HOT 7
- [SUPPORT] URI too long error HOT 5
- [SUPPORT] AWS Glue: An error occurred while calling o333.save. Failed to apply clean commit to metadata HOT 1
- Does Hudi has the warm/cold data archive solution HOT 1
- [SUPPORT] Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20240618064120870] error HOT 6
- [SUPPORT] SqlQueryBasedTransformer new field issue with PostgresDebeziumSource HOT 7
- unable to connect hudi from hive CLI HOT 3
- read data from hudi using trino HOT 1
- [SUPPORT]Failed to Read .log file when i using trino to select hudi table HOT 4
- [SUPPORT] - Performance Variation in Hudi 0.14 HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from hudi.