Looks like if we create some table,make some insert in it, after that drop it in MySQL and on ClickHouse side and recreate it (create with the same table name and schema but different column data type) in some cases it leads to table schema conflict in debezium and crash it till manual recovery.
Some logs:
WorkerSourceTask{id=test-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
debezium | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:330)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:356)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:258)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
debezium | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
debezium | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
debezium | at java.base/java.lang.Thread.run(Thread.java:829)
debezium | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic SERVER5432.test.users1 :
debezium | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
debezium | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:330)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
debezium | ... 11 more
debezium | Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"Envelope","namespace":"SERVER5432.test.users1","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"SERVER5432.test.users1.Envelope"}
debezium | at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
debezium | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
debezium | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
debezium | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
debezium | ... 15 more
debezium | Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "SERVER5432.test.users1-value", details: [Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/0/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}, Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/1/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}] io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleSchemaException: Schema being registered is incompatible with an earlier schema for subject "SERVER5432.test.users1-value", details: [Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/0/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}, Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/1/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}]
debezium | io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleSchemaException: Schema being registered is incompatible with an earlier schema for subject "SERVER5432.test.users1-value", details: [Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/0/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}, Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/1/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}]
debezium | at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:133)