Coder Social home page Coder Social logo

Comments (8)

YousifS7 avatar YousifS7 commented on August 27, 2024 1

Hi @wombatu-kun,

I wanted to provide an update on the issue. After some experimentation with the hoodie.streamer.schemaprovider.registry.targetUrl configuration, I was able to resolve the problem. Initially, I assumed that Spark would infer the schema from the transformed data. However, I registered a custom flattened schema in my schema registry to match the desired output and utilized the hoodie.streamer.schemaprovider.registry.targetUrl setting. Once I ran the job with these configurations, it worked.

Thank you

from hudi.

wombatu-kun avatar wombatu-kun commented on August 27, 2024

Try CACHE TABLE dbz_fields AS SELECT ts_ms, source_fields FROM dbz_events;. (source_fields without .*)

from hudi.

YousifS7 avatar YousifS7 commented on August 27, 2024

Hi @wombatu-kun, Thank you for getting back to me.

I have modified the SQL File to the following:

CACHE TABLE dbz_filtered AS
SELECT ts_ms, op, before, after FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r');

CACHE TABLE dbz_events AS
SELECT ts_ms, CASE WHEN op = 'd' THEN before ELSE after END AS source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS is_deleted FROM dbz_filtered;

CACHE TABLE dbz_fields AS
SELECT ts_ms, source_fields FROM dbz_events;

SELECT source_fields.*, Concat(source_fields.col1, source_fields.col2) AS hudi_key, YEAR(FROM_UNIXTIME(source_fields.col2 / 1000)) AS partition_path FROM dbz_fields;

I'm still getting the same error:

HoodieKeyException: recordKey value: "null" for field: "hudi_key" cannot be null or empty.

Not sure if I'm missing something else.

Thank you

from hudi.

wombatu-kun avatar wombatu-kun commented on August 27, 2024

what if you try this?

CACHE TABLE dbz_fields AS
SELECT ts_ms, source_fields FROM dbz_events;

SELECT s.*, Concat(s.col1, s.col2) AS hudi_key, YEAR(FROM_UNIXTIME(s.col2 / 1000)) AS partition_path FROM dbz_fields s;

from hudi.

YousifS7 avatar YousifS7 commented on August 27, 2024

Trying the suggested SQL is giving the following error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `s`.`col1` cannot be resolved. Did you mean one of the following? [`s`.`ts_ms`, `s`.`source_fields`].

from hudi.

YousifS7 avatar YousifS7 commented on August 27, 2024

Following up on this, I was able to run the spark app without errors, however, even when specifying columns that are in the 'before' and 'after' Struct type extracted from Kafka, only the following columns were written to the Hudi table:

ts_ms, op, before, after, source

Transformer File Query

CACHE TABLE dbz_filtered AS
SELECT a.ts_ms, a.op, a.before, a.after, a.source FROM <SRC> a WHERE a.op IN ('d', 'u', 'c', 'r');

SELECT b.before.col1 AS before_col1, b.after.col1 AS after_col1, b.ts_ms, b.op, b.before, b.after, b.source FROM dbz_filtered b;

It seems the additional columns extracted from the envelopes 'before' and 'after' are being ignored for some reason. I even tried to specify a static columns like: 'test' AS test_columns, it was not written on the Hudi tables.

Properties File

hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.recordkey.field=op
hoodie.datasource.write.partitionpath.field=op
hoodie.datasource.write.precombine.field=ts_ms
hoodie.streamer.transformer.sql.file=s3://some_bucket/configs/some_table.sql
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/test.dbo.some_table-value/versions/latest
schema.registry.url=http://localhost:8081
hoodie.streamer.source.kafka.topic=test.dbo.some_table
bootstrap.servers=localhost:9092
auto.offset.reset=earliest

Spark Submit

spark-submit 
--packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0,org.apache.hudi:hudi-aws-bundle:0.15.0
--conf spark.streaming.kafka.allowNonConsecutiveOffsets=true 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--class org.apache.hudi.utilities.streamer.HoodieStreamer s3://some_bucket/jars/hudi-utilities-slim-bundle.jar 
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider 
--props s3://some_bucket/configs/some_table.properties 
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
--enable-sync 
--source-ordering-field ts_ms 
--target-base-path s3://some_bucket/some_folder/some_table 
--target-table dbo.some_table 
--table-type COPY_ON_WRITE 
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool 
--op BULK_INSERT 
--transformer-class org.apache.hudi.utilities.transform.SqlFileBasedTransformer

Is there something else that I might be missing?

Thank you

from hudi.

wombatu-kun avatar wombatu-kun commented on August 27, 2024

As I see from documentation here https://hudi.apache.org/docs/hoodie_streaming_ingestion#schema-providers

By default, Spark will infer the schema of the source and use that inferred schema when writing to a table...

May be you should add target schema with additional fields to schema registry and provide Url to that schema in config hoodie.streamer.schemaprovider.registry.targetUrl

from hudi.

YousifS7 avatar YousifS7 commented on August 27, 2024

@wombatu-kun Couldn't that be extracted from the Transformer?

SqlFileBasedTransformer

As I understand it, the Transformer sits between the Source and Target. For some reason my additional fields are being ignored.
I included several fields into my SQL Transformer but none of the additional fields were written to the Hudi table.

Thank you

from hudi.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.