satyajitv / debeziumtimestampconvertor Goto Github PK
View Code? Open in Web Editor NEWKafka SMT implementation of Debezium timestamp convertor
Kafka SMT implementation of Debezium timestamp convertor
When the field is not null the convertor sets in the schema optional = false
, overriding the value that comes from the database. ¿Is it possible to keep the original value?
Debezium MySQL Source Connector Configuration
{
"name": "cdc-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "",
"database.hostname": "host.docker.internal",
"database.port": "3306",
"database.user": "demo",
"database.password": "",
"database.server.id": "223344",
"database.server.name": "dbs1",
"table.include.list": "demo.Events",
"include.query": "true",
"database.history.kafka.bootstrap.servers": "",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "****",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "******",
"database.history.kafka.topic": "dbhistory.demo2",
"database.allowPublicKeyRetrieval": true,
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "3",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "addTopicPrefix,TimestampConv",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.)",
"transforms.addTopicPrefix.replacement":"mysql-debezium-$1",
"snapshot.mode": "when_needed",
"transforms.TimestampConv.type":"org.telmate.SMT.DebeziumTimestampConverter$Value",
"transforms.TimestampConv.target.type":"string",
"transforms.TimestampConv.field.type":"io.debezium.time.Timestamp->string,io.debezium.time.Date->string,io.debezium.time.MicroTimestamp->string",
"transforms.TimestampConv.struct.field":"after",
"transforms.TimestampConv.timestamp.format":"yyyy-MM-dd hh:mm:ss"
}
}
When I run the connector with the above configuration, I am getting following error:
[2022-12-15 07:59:06,125] ERROR [cdc-connector|task-0] WorkerSourceTask{id=cdc-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
connect-107 | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect-107 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
connect-107 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect-107 | at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
connect-107 | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
connect-107 | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
connect-107 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
connect-107 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
connect-107 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect-107 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect-107 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect-107 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect-107 | at java.base/java.lang.Thread.run(Thread.java:829)
connect-107 | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic mysql-debezium-dbs1.demo.Events :
connect-107 | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:91)
connect-107 | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
connect-107 | at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
connect-107 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect-107 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect-107 | ... 11 more
connect-107 | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
connect-107 | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:119)
connect-107 | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:143)
connect-107 | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84)
connect-107 | ... 15 more
connect-107 | Caused by: org.apache.avro.SchemaParseException: Can't redefine: dbs1.demo.Events.Value
connect-107 | at org.apache.avro.Schema$Names.put(Schema.java:1511)
connect-107 | at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
connect-107 | at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
connect-107 | at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
connect-107 | at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
connect-107 | at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
connect-107 | at org.apache.avro.Schemas.toString(Schemas.java:46)
connect-107 | at org.apache.avro.Schemas.toString(Schemas.java:30)
connect-107 | at io.confluent.kafka.schemaregistry.avro.AvroSchema.canonicalString(AvroSchema.java:140)
connect-107 | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206)
connect-107 | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268)
connect-107 | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244)
connect-107 | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:75)
connect-107 | ... 17 more
If the custom DebeziumTimestampConverter is not used then the connector is working fine but the connector throws error when custom converter is used.
Could you please help me with the issue?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.