Comments (8)
Hi @wombatu-kun,
I wanted to provide an update on the issue. After some experimentation with the hoodie.streamer.schemaprovider.registry.targetUrl
configuration, I was able to resolve the problem. Initially, I assumed that Spark would infer the schema from the transformed data. However, I registered a custom flattened schema in my schema registry to match the desired output and utilized the hoodie.streamer.schemaprovider.registry.targetUrl
setting. Once I ran the job with these configurations, it worked.
Thank you
from hudi.
Try CACHE TABLE dbz_fields AS SELECT ts_ms, source_fields FROM dbz_events;
. (source_fields
without .*)
from hudi.
Hi @wombatu-kun, Thank you for getting back to me.
I have modified the SQL File to the following:
CACHE TABLE dbz_filtered AS
SELECT ts_ms, op, before, after FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r');
CACHE TABLE dbz_events AS
SELECT ts_ms, CASE WHEN op = 'd' THEN before ELSE after END AS source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS is_deleted FROM dbz_filtered;
CACHE TABLE dbz_fields AS
SELECT ts_ms, source_fields FROM dbz_events;
SELECT source_fields.*, Concat(source_fields.col1, source_fields.col2) AS hudi_key, YEAR(FROM_UNIXTIME(source_fields.col2 / 1000)) AS partition_path FROM dbz_fields;
I'm still getting the same error:
HoodieKeyException: recordKey value: "null" for field: "hudi_key" cannot be null or empty.
Not sure if I'm missing something else.
Thank you
from hudi.
what if you try this?
CACHE TABLE dbz_fields AS
SELECT ts_ms, source_fields FROM dbz_events;
SELECT s.*, Concat(s.col1, s.col2) AS hudi_key, YEAR(FROM_UNIXTIME(s.col2 / 1000)) AS partition_path FROM dbz_fields s;
from hudi.
Trying the suggested SQL is giving the following error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `s`.`col1` cannot be resolved. Did you mean one of the following? [`s`.`ts_ms`, `s`.`source_fields`].
from hudi.
Following up on this, I was able to run the spark app without errors, however, even when specifying columns that are in the 'before' and 'after' Struct type extracted from Kafka, only the following columns were written to the Hudi table:
ts_ms, op, before, after, source
Transformer File Query
CACHE TABLE dbz_filtered AS
SELECT a.ts_ms, a.op, a.before, a.after, a.source FROM <SRC> a WHERE a.op IN ('d', 'u', 'c', 'r');
SELECT b.before.col1 AS before_col1, b.after.col1 AS after_col1, b.ts_ms, b.op, b.before, b.after, b.source FROM dbz_filtered b;
It seems the additional columns extracted from the envelopes 'before' and 'after' are being ignored for some reason. I even tried to specify a static columns like: 'test' AS test_columns, it was not written on the Hudi tables.
Properties File
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.recordkey.field=op
hoodie.datasource.write.partitionpath.field=op
hoodie.datasource.write.precombine.field=ts_ms
hoodie.streamer.transformer.sql.file=s3://some_bucket/configs/some_table.sql
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/test.dbo.some_table-value/versions/latest
schema.registry.url=http://localhost:8081
hoodie.streamer.source.kafka.topic=test.dbo.some_table
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
Spark Submit
spark-submit
--packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0,org.apache.hudi:hudi-aws-bundle:0.15.0
--conf spark.streaming.kafka.allowNonConsecutiveOffsets=true
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--class org.apache.hudi.utilities.streamer.HoodieStreamer s3://some_bucket/jars/hudi-utilities-slim-bundle.jar
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
--props s3://some_bucket/configs/some_table.properties
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--enable-sync
--source-ordering-field ts_ms
--target-base-path s3://some_bucket/some_folder/some_table
--target-table dbo.some_table
--table-type COPY_ON_WRITE
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool
--op BULK_INSERT
--transformer-class org.apache.hudi.utilities.transform.SqlFileBasedTransformer
Is there something else that I might be missing?
Thank you
from hudi.
As I see from documentation here https://hudi.apache.org/docs/hoodie_streaming_ingestion#schema-providers
By default, Spark will infer the schema of the source and use that inferred schema when writing to a table...
May be you should add target schema with additional fields to schema registry and provide Url to that schema in config hoodie.streamer.schemaprovider.registry.targetUrl
from hudi.
@wombatu-kun Couldn't that be extracted from the Transformer?
As I understand it, the Transformer sits between the Source and Target. For some reason my additional fields are being ignored.
I included several fields into my SQL Transformer but none of the additional fields were written to the Hudi table.
Thank you
from hudi.
Related Issues (20)
- [SUPPORT]Failed to update metadata(hudi 0.15.0) HOT 23
- unable to load the data from avro file into Hudi table HOT 3
- [SUPPORT] Unknown listing type on metadata HOT 2
- [SUPPORT]Failed to rollback for instant(hudi 0.15.0) HOT 5
- Exception while using HoodieStreamer protobuf data from Kafka HOT 7
- [SUPPORT] current state of promotion type support HOT 11
- [SUPPORT] hudi-common 0.14.0 jar in mavenCentral appears to have corrupt generated avro classes HOT 3
- hive sql查询hudi分区表,如果分区字段不是表最后一列,解析parquet文件后返回的数据,没有查询分区字段单在分区字段列位置自动增加了分区字段的值,导致后续列错误发生类型转换问题 HOT 6
- [SUPPORT] Mandatory registration of `HoodieSparkKryoRegistrar` using PySpark for 1.0.0-beta2-rc2 HOT 5
- [SUPPORT] Failed to insert overwrite hudi table when defining partition column with int type. HOT 2
- Duplicate records in MOR (Hudi 0.12.2 on EMR)
- [SUPPORT] HoodieFlinkWriteClient null pointer for HudiRecord locationdata HOT 2
- [SUPPORT] Why HUDI ConsistentBucketClusteringExecutionStrategy not supported by flink engine? HOT 3
- [SUPPORT] HoodieSnapshotExporter is broken with PR HUDI-712 HOT 14
- [SUPPORT] Avoid rescan file when clean HOT 5
- [SUPPORT] Exception in running table services on metadata table (hudi0.15.0) HOT 5
- [SUPPORT]Encountered java.lang.ClassNotFoundException when using hudi through spark-sql HOT 1
- [SUPPORT] Timeout waiting for connection from pool running Hudi 0.14.0 on EMR 7.0.0 HOT 3
- [SUPPORT] Use Case Validation for CDC Ingestion from Kafka/Debezium (SQL Server) into Apache Hudi HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from hudi.