Coder Social home page Coder Social logo

altinity / clickhouse-sink-connector Goto Github PK

View Code? Open in Web Editor NEW
175.0 14.0 44.0 230.2 MB

Replicate data from MySQL, Postgres and MongoDB to ClickHouse

Home Page: https://www.altinity.com

License: Apache License 2.0

Java 28.24% Shell 2.12% Python 63.89% Dockerfile 0.19% Smarty 0.07% ANTLR 5.21% JavaScript 0.11% Go 0.16%
avro kafka kafka-connect clickhouse postgresql replication cdc debezium mysql mongo

clickhouse-sink-connector's Introduction

License Sink Connector(Kafka version) tests Sink Connector(Light-weight) Tests AltinityDB Slack Docker Pulls

Latest Releases

https://github.com/Altinity/clickhouse-sink-connector/releases |

Altinity Sink Connector for ClickHouse

The Altinity Sink Connector moves data automatically from transactional database tables in MySQL and PostgreSQL to ClickHouse for analysis.

Features

  • Initial data dump and load
  • Change data capture of new transactions using Debezium
  • Automatic loading into ClickHouse
  • Sources: Support for MySQL, PostgreSQL (other databases experimental)
  • Target: Support for ClickHouse ReplacingMergeTree
  • Able to recover/restart from failures on source or target
  • Handle upstream schema changes automatically
  • Checksum-based table comparisons
  • Scalable to 1000s of tables
  • Multiple deployment models
    • Lightweight: single process that transfers from source to target.
    • Kafka: separate source and target processes using Kafka as transport.
  • Distribution as Docker container

Getting Started

QuickStart Guide: Lightweight (MySQL)
QuickStart Guide: Lightweight (PostgreSQL)
QuickStart Guide: Kafka

Blog Articles

First two are good tutorials on MySQL and PostgreSQL respectively.

Reference Documentation

General

Operations

Development

Roadmap

2024 Roadmap

Help

File an issue or contact us on the Altinity public Slack workspace. Use the link on the Slack badge at the top of this page.

Contributing

Contributions to the project are welcome in any form.

  • Submit issues documenting feature requests and bugs
  • Submit PRs to make changes
  • Talk about the project, write blog articles, or give presentations

We recommend that you file an issue before implementing feature additions or major fixes. We are happy to provide guidance and encouragement!

Commercial Support

Altinity is the primary maintainer of the Sink Connector. It is used together with Altinity.Cloud as well as self-managed ClickHouse installations. Altinity.Cloud and is also used in self-managed installations. Altinity offers a range of software and services related to ClickHouse and analytic applications built on ClickHouse.

  • Official website - Get a high level overview of Altinity and our offerings.
  • Altinity.Cloud - Run ClickHouse in our cloud or yours.
  • Altinity Support - Get Enterprise-class support for ClickHouse and Sink Connector.
  • Slack - Talk directly with ClickHouse users and Altinity devs.
  • Contact us - Contact Altinity with your questions or issues.
  • Free consultation - Get a free consultation with a ClickHouse expert today.

clickhouse-sink-connector's People

Contributors

aadant avatar adamkatzdev avatar alex-zaitsev avatar almostivansidorov avatar bader-tayeb avatar dougtidwell avatar hodgesrm avatar ilyatsoi avatar minguyen9988 avatar myrotk avatar raphaelauv avatar selfeer avatar subkanthi avatar sunsingerus avatar vzakaznikov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

clickhouse-sink-connector's Issues

Version (ver) column become current epoch time

image

Here is the source connector config:

#!/bin/bash
CONNECTOR_NAME="source"
HOST="mysql"
PORT="3306"
USER="root"
PASSWORD="root"
DBS="employees"
DATABASE_SERVER_ID="5432"
DATABASE_SERVER_NAME="SERVER5432"
CONNECTOR_CLASS="io.debezium.connector.mysql.MySqlConnector"

KAFKA_BOOTSTRAP_SERVERS="kafka:9092"
KAFKA_TOPIC="schema-changes.employees"
SNAPSHOT_MODE="initial"

CONNECTORS_MANAGEMENT_URL="http://connect:8083/connectors/"

cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
    {
        "name": "${CONNECTOR_NAME}",
        "config": {
            "connector.class": "${CONNECTOR_CLASS}",
            "tasks.max": "1",
            "snapshot.mode": "${SNAPSHOT_MODE}",
            "snapshot.locking.mode": "none",
            "snapshot.delay.ms": 10000,
            "include.schema.changes":"true",
            "include.schema.comments": "true",
            "database.hostname": "${HOST}",
            "database.port": "${PORT}",
            "database.user": "${USER}",
            "database.password": "${PASSWORD}",
            "database.server.id": "${DATABASE_SERVER_ID}",
            "database.server.name": "${DATABASE_SERVER_NAME}",
            "database.whitelist": "${DBS}",
            "database.allowPublicKeyRetrieval":"true",
            "database.history.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
            "database.history.kafka.topic": "${KAFKA_TOPIC}",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schemaregistry:8081",
            "value.converter.schema.registry.url":"http://schemaregistry:8081",
            "topic.creation.$alias.partitions": 1,
            "topic.creation.default.replication.factor": 1,
            "topic.creation.default.partitions": 1,
            "provide.transaction.metadata": "true",
            "max.batch.size": 128000,
            "max.queue.size": 512000
        }
    }
EOF

Here is the sink connector config:

#!/bin/bash

CLICKHOUSE_HOST="clickhouse01"
CLICKHOUSE_PORT=8123
CLICKHOUSE_USER="default"
CLICKHOUSE_PASSWORD="default"
CLICKHOUSE_TABLE="employees"
CLICKHOUSE_DATABASE="default"

CONNECTOR_NAME="sink"

TOPICS="SERVER5432.employees.employees, SERVER5432.employees.salaries, SERVER5432.employees.titles, SERVER5432.employees.departments, SERVER5432.employees.dept_emp, SERVER5432.employees.dept_manager"
#TOPICS="SERVER5432.employees.ontime"
TOPICS_TABLE_MAP="SERVER5432.employees.employees:employees, SERVER5432.employees.salaries:salaries, SERVER5432.employees.titles:titles, SERVER5432.employees.departments:departments, SERVER5432.employees.dept_emp:dept_emp, SERVER5432.employees.dept_manager:dept_manager"

CONNECTORS_MANAGEMENT_URL="http://sink:8083/connectors/"
cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
    {
        "name": "${CONNECTOR_NAME}",
        "config": {
            "connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
            "tasks.max": "20",
            "topics": "${TOPICS}",
            "clickhouse.topic2table.map": "${TOPICS_TABLE_MAP}",
            "clickhouse.server.url": "${CLICKHOUSE_HOST}",
            "clickhouse.server.database": "${CLICKHOUSE_DATABASE}",
            "clickhouse.server.port": ${CLICKHOUSE_PORT},
            "clickhouse.server.user": "${CLICKHOUSE_USER}",
            "clickhouse.server.pass": "${CLICKHOUSE_PASSWORD}",
            "clickhouse.table.name": "${CLICKHOUSE_TABLE}",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schemaregistry:8081",
            "value.converter.schema.registry.url":"http://schemaregistry:8081",
            "store.kafka.metadata": true,
            "topic.creation.default.partitions": 6,
            "store.raw.data": false,
            "store.raw.data.column": "raw_data",
            "buffer.flush.time.ms": 500,
            "thread.pool.size": 1,
            "fetch.min.bytes": 52428800,
            "enable.kafka.offset": false,
            "replacingmergetree.delete.column": "sign",
            "auto.create.tables": true,
            "schema.evolution": false,
            "deduplication.policy": "off"
        }
    }
EOF

After I change one of the data on mysql, I query without the FINAL keyword in ClickHouse, this will show up:
image

With FINAL keyword:
image

Can help me what am I doing wrong here? Appreciate it thanks!

Add sakila employees database

Add sakila employees database to the initial mysql scripts.
Add python scripts to validate counts.(or use checksum utility when its available).

Support primary key updates and changes

https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-primary-key-updates

Sysbench read_write_load_test seems to create primary key update records which are not currently handled in the connector.

From Debezium documentation.
Primary key updates
An UPDATE operation that changes a rowโ€™s primary key field(s) is known as a primary key change. For a primary key change, in place of an UPDATE event record, the connector emits a DELETE event record for the old key and a CREATE event record for the new (updated) key. These events have the usual structure and content, and in addition, each one has a message header related to the primary key change:

The DELETE event record has __debezium.newkey as a message header. The value of this header is the new primary key for the updated row.

The CREATE event record has __debezium.oldkey as a message header. The value of this header is the previous (old) primary key that the updated row had.

PRIMARY KEY CHANGES
We need to test the scenario when the primary key is changed.


--Remove the old PK 
ALTER TABLE table_name DROP PRIMARY KEY
-- Add the new PK 
ALTER TABLE table_name ADD PRIMARY KEY (Id)

Kubernetes enhancements

Kubernetes yaml manifest file updates

  • Schema Registry (Confluent)
  • Redpanda production configuration
  • Grafana/Prometheus
  • console(Redpanda UI)

Wrong mapping in auto create tables on "altinity/clickhouse-sink-connector:2022-08-02"

All "INT UNSIGNED" types should go to "UInt" (current version critical for big values of "BIGINT UNSIGNED" as "Int64" is not enough for them) , "TINYINT" should go to "Int8", maybe "CHAR" better place in "LowCardinality(String)". Same issue for Nullable types.

All data types current situation:

MySQL:

| users | CREATE TABLE users (
id int NOT NULL AUTO_INCREMENT,
D4 decimal(2,1) NOT NULL,
D5 decimal(30,10) NOT NULL,
Doublex double NOT NULL,
x_date date NOT NULL,
x_datetime6 datetime(6) NOT NULL,
x_time time NOT NULL,
x_time6 time(6) NOT NULL,
Intmin int NOT NULL,
Intmax int NOT NULL,
UIntmin int unsigned NOT NULL,
UIntmax int unsigned NOT NULL,
BIGIntmin bigint NOT NULL,
BIGIntmax bigint NOT NULL,
UBIGIntmin bigint unsigned NOT NULL,
UBIGIntmax bigint unsigned NOT NULL,
TIntmin tinyint NOT NULL,
TIntmax tinyint NOT NULL,
UTIntmin tinyint unsigned NOT NULL,
UTIntmax tinyint unsigned NOT NULL,
SIntmin smallint NOT NULL,
SIntmax smallint NOT NULL,
USIntmin smallint unsigned NOT NULL,
USIntmax smallint unsigned NOT NULL,
MIntmin mediumint NOT NULL,
MIntmax mediumint NOT NULL,
UMIntmin mediumint unsigned NOT NULL,
UMIntmax mediumint unsigned NOT NULL,
x_char char(1) NOT NULL,
x_text text NOT NULL,
x_varchar varchar(4) NOT NULL,
x_Blob blob NOT NULL,
x_Mediumblob mediumblob NOT NULL,
x_Longblob longblob NOT NULL,
x_binary binary(1) NOT NULL,
x_varbinary varbinary(4) NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb3

Auto created Clickhouse:
CREATE TABLE test.users
(
id Int32,
D4 Decimal(2, 1),
D5 Decimal(30, 10),
Doublex Float64,
x_date Date32,
x_datetime6 String,
x_time String,
x_time6 String,
Intmin Int32,
Intmax Int32,
UIntmin Int64,
UIntmax Int64,
BIGIntmin Int64,
BIGIntmax Int64,
UBIGIntmin Int64,
UBIGIntmax Int64,
TIntmin Int16,
TIntmax Int16,
UTIntmin Int16,
UTIntmax Int16,
SIntmin Int16,
SIntmax Int16,
USIntmin Int32,
USIntmax Int32,
MIntmin Int32,
MIntmax Int32,
UMIntmin Int32,
UMIntmax Int32,
x_char String,
x_text String,
x_varchar String,
x_Blob String,
x_Mediumblob String,
x_Longblob String,
x_binary String,
x_varbinary String,
sign Int8,
ver UInt64
)
ENGINE = ReplacingMergeTree(ver)
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192

โ”‚ CREATE TABLE test.users
(
id Int32,
D4 Nullable(Decimal(2, 1)),
D5 Nullable(Decimal(30, 10)),
Doublex Nullable(Float64),
x_date Nullable(Date32),
x_datetime6 Nullable(String),
x_time Nullable(String),
x_time6 Nullable(String),
Intmin Nullable(Int32),
Intmax Nullable(Int32),
UIntmin Nullable(Int64),
UIntmax Nullable(Int64),
BIGIntmin Nullable(Int64),
BIGIntmax Nullable(Int64),
UBIGIntmin Nullable(Int64),
UBIGIntmax Nullable(Int64),
TIntmin Nullable(Int16),
TIntmax Nullable(Int16),
UTIntmin Nullable(Int16),
UTIntmax Nullable(Int16),
SIntmin Nullable(Int16),
SIntmax Nullable(Int16),
USIntmin Nullable(Int32),
USIntmax Nullable(Int32),
MIntmin Nullable(Int32),
MIntmax Nullable(Int32),
UMIntmin Nullable(Int32),
UMIntmax Nullable(Int32),
x_char Nullable(String),
x_text Nullable(String),
x_varchar Nullable(String),
x_Blob Nullable(String),
x_Mediumblob Nullable(String),
x_Longblob Nullable(String),
x_binary Nullable(String),
x_varbinary Nullable(String),
sign Int8,
ver UInt64
)
ENGINE = ReplacingMergeTree(ver)
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192

Ingestion bugs

  • Discuss logic of PreparedStatement.executeBatch return value. The return value is an array. If its a value greater than 0, then it is successful.

DDL support: Support for ALTER TABLE MODIFY

When MySQL table column data types are modified using ALTER TABLE, that change needs to be replicated in ClickHouse, sink connector needs to perform ALTER TABLE MODIFY COLUMN to update the column's data type.

JSON data type support

Both MySQL JSON and Postgres JSON/JSONB map to String with schema name(io.debezium.Data.Json)

For now, JSON columns are mapped to String, there seems to be a bug in JDBC/CH server for supporting insert batch with JSON columns.

ClickHouse/clickhouse-java#1029

Metrics flush - logic

Use a separate thread with a user configurable parameter(to set flush time) for flushing metrics to Prometheus.

Loss of milliseconds in datetime(6) and time(6) replication from MySQL to ClickHouse

For insert data '2018-09-08 17:51:04.777' in mysql datetime(6) result is '2018-09-08 17:51:04.777000' but after transfer to clickhouse datetime64(6) result is '2018-09-08 17:51:04.000000', also if we use string it is '2018-09-08 17:51:04'. Looks like we lose milleseconds (same for time(6) '17:51:04.77' MySQL in CH just '17:51:04')

Some scheme info:
I create MySQL table:
CREATE TABLE IF NOT EXISTS {table_name}
(id INT NOT NULL,
x_date DATE NOT NULL,
x_datetime6 DATETIME(6),
x_time TIME,
x_time6 TIME(6), PRIMARY KEY (id));

I create ClickHouse replica:
CREATE TABLE IF NOT EXISTS {table_name}
(id Int32,
x_date Date,
x_datetime6 String,
x_time String,
x_time6 String)
ENGINE = MergeTree
PRIMARY KEY id ORDER BY id SETTINGS
index_granularity = 8192;

I insert data in MySql table:
date = "1,'2012-12-12','2018-09-08 17:51:04.777','17:51:04.777', '17:51:04.777'"
INSERT INTO {table_name} VALUES ({date});
I check clickhouse table dataset:
output_data: '1,"2012-12-12","2018-09-08 17:51:04","17:51:05","17:51:04"'

Customers table error.

Customers table data set is not copied correctly to CH because of the following error.

java.sql.SQLException: Missing value for parameter #1

Sysbench count does not match

./run_sysbench_tests.sh -t oltp_insert
looks good
./run_sysbench_tests.sh -t oltp_update_index

docker exec -it clickhouse clickhouse-client -uroot --password root --query "select count(*) from test.sbtest1 final"
<jemalloc>: Number of CPUs detected is not deterministic. Per-CPU arena disabled.
823882
docker exec -it mysql-master mysql -uroot -proot -e "select count(*) from sbtest.sbtest1"
+----------+
| count(*) |
+----------+
|      100 |
+----------+

MySQL BIGINT to CH Int64 doesn't work correct with "auto.create.tables": true setting.

Looks like kafka deb or sink allows only [-4294967294, 4294967295] interval of values, cause manually tables in MySQL and CH eat ['-9223372036854775808', '9223372036854775807'] expected interval but full chain allows only [-4294967294, 4294967295].
Other types looks work correct for edge and some mid values.
Docker v.: altinity/clickhouse-sink-connector:2022-07-21

Exception when metrics is disabled

When metrics is disabled, the flag is not respected and the application tries to update metrics counter.

    metrics.enable: "false"
java.lang.NullPointerException
	at com.altinity.clickhouse.sink.connector.common.Metrics.updateCounters(Metrics.java:226)
	at com.altinity.clickhouse.sink.connector.db.DbWriter.addToPreparedStatementBatch(DbWriter.java:456)
	at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.flushRecordsToClickHouse(ClickHouseBatchRunnable.java:228)
	at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.processRecordsByTopic(ClickHouseBatchRunnable.java:189)
	at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.run(ClickHouseBatchRunnable.java:105)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

"auto.create.tables": true setting doesn't work correct (altinity/clickhouse-sink-connector:2022-07-26)

Looks like auto creation with the first "INSERT" in one step doesn't work correct (but creation of table in ClickHouse works correct if we don't drop table after first insert which is always wrong, all other inserts work correct). I tried with different variants of sink start up.

Sink ver.:
altinity/clickhouse-sink-connector:2022-07-26

Some retries and results:
Case 1:
1) restart sink/non restart sink
2) sysbench test "load insert" + "auto.create.tables": true in sink
3) optimize CH table
3) drop table in the end
result: data in MySQL and CH always different.

Case 2:
1) restart sink/non restart sink
2) sysbench test "load insert" + "auto.create.tables": true in sink
3) optimize CH table
3) truncate table in the end
result: data in MySQL and CH on the first run different then always same.

Loose of data on sink connector start up.

Looks like if we start 'INSERT' data in MySQL to fast after sink connector start up with "auto.create.tables: true", we can receive data loose in current sink connector composition (confluentinc/cp-schema-registry:latest & altinity/clickhouse-sink-connector:2022-08-02).

It is reproducible on "oltp_delete" sysbench test for example.

Support for nullable JSON fields

Example from world_x database

2 issues

  • DB::Exception: Nested type Object('json') cannot be inside Nullable type.
  • JSON is currenlty experimental : set allow_experimental_object_type = 1 to create the table
Table from world_x :

 CREATE TABLE `countryinfo` (
  `doc` json DEFAULT NULL,
  `_id` varbinary(32) GENERATED ALWAYS AS (json_unquote(json_extract(`doc`,_utf8mb4'$._id'))) STORED NOT NULL,
  `_json_schema` json GENERATED ALWAYS AS (_utf8mb4'{"type":"object"}') VIRTUAL,
  PRIMARY KEY (`_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci


CREATE TABLE countryinfo(`doc` JSON NULL,`_id` String NOT NULL,`_json_schema` JSON NULL,`sign` Int8,`ver` UInt64) ENGINE = ReplacingMergeTree(ver) PRIMARY KEY(_id) ORDER BY(_id)
 
2022-09-11 22:18:54,460 ERROR  ||  **** Error creating table ***countryinfo   [com.altinity.clickhouse.sink.connector.db.DbWriter]
java.sql.SQLException: Code: 43. DB::Exception: Nested type Object('json') cannot be inside Nullable type. (ILLEGAL_TYPE_OF_ARGUMENT) (version 22.8.4.7 (official build))
, server ClickHouseNode [uri=http://clickhouse:8123/world_x, options={client_name=Agent_1}]@1498614193
        at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:85)
        at com.clickhouse.jdbc.SqlExceptionUtils.create(SqlExceptionUtils.java:31)
        at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:90)
        at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.getLastResponse(ClickHouseStatementImpl.java:96)
        at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:304)
        at com.altinity.clickhouse.sink.connector.db.operations.ClickHouseTableOperationsBase.runQuery(ClickHouseTableOperationsBase.java:85)
        at com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAutoCreateTable.createNewTable(ClickHouseAutoCreateTable.java:26)
        at com.altinity.clickhouse.sink.connector.db.DbWriter.<init>(DbWriter.java:97)
        at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.getDbWriterForTable(ClickHouseBatchRunnable.java:143)
        at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.processRecordsByTopic(ClickHouseBatchRunnable.java:168)
        at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.run(ClickHouseBatchRunnable.java:105)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Code: 43. DB::Exception: Nested type Object('json') cannot be inside Nullable type. (ILLEGAL_TYPE_OF_ARGUMENT) (version 22.8.4.7 (official build))

        at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:194)
        at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:289)
        at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:123)
        at com.clickhouse.client.AbstractClient.execute(AbstractClient.java:296)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.sendOnce(ClickHouseClientBuilder.java:284)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.send(ClickHouseClientBuilder.java:296)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.execute(ClickHouseClientBuilder.java:351)
        at com.clickhouse.client.ClickHouseClient.executeAndWait(ClickHouseClient.java:824)
        at com.clickhouse.client.ClickHouseRequest.executeAndWait(ClickHouseRequest.java:1972)
        at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.getLastResponse(ClickHouseStatementImpl.java:94)
        ... 13 more
		
Code: 44. DB::Exception: Received from localhost:9000. DB::Exception: Cannot create table with column 'doc' which type is 'Object('json')' because experimental Object type is not allowed. Set setting allow_experimental_object_type = 1 in order to allow it. (ILLEGAL_COLUMN)

DateTime parsing Exception

UInt64,sign Int8') [com.altinity.clickhouse.sink.connector.db.DbWriter]
2022-08-26 02:46:17,652 ERROR || ******* ERROR inserting Batch ***************** [com.altinity.clickhouse.sink.connector.db.DbWriter]
java.time.format.DateTimeParseException: Text '2020-09-01T12:45:17' could not be parsed at index 10
at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2046)
at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1948)
at java.base/java.time.LocalDateTime.parse(LocalDateTime.java:492)
at com.clickhouse.client.data.ClickHouseDateTimeValue.update(ClickHouseDateTimeValue.java:377)
at com.clickhouse.client.data.ClickHouseDateTimeValue.update(ClickHouseDateTimeValue.java:24)
at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.setString(InputBasedPreparedStatement.java:265)
at com.altinity.clickhouse.sink.connector.db.DbWriter.insertPreparedStatement(DbWriter.java:630)
at com.altinity.clickhouse.sink.connector.db.DbWriter.addToPreparedStatementBatch(DbWriter.java:418)
at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.flushRecordsToClickHouse(ClickHouseBatchRunnable.java:228)
at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.processRecordsByTopic(ClickHouseBatchRunnable.java:189)
at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.run(ClickHouseBatchRunnable.java:105)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2022-08-26 02:46:17,654 ERROR || ClickHouseBatchRunnable exception - Task(0) [com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable]
java.lang.NullPointerException
at com.altinity.clickhouse.sink.connector.common.Metrics.updateCounters(Metrics.java:226)
at com.altinity.clickhouse.sink.connector.db.DbWriter.addToPreparedStatementBatch(DbWriter.java:456)
at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.flushRecordsToClickHouse(ClickHouseBatchRunnable.java:228)
at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.processRecordsByTopic(ClickHouseBatchRunnable.java:189)
at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.run(ClickHouseBatchRunnable.java:105)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

Debezium table schema conflict after table recreation with different data types.

Looks like if we create some table,make some insert in it, after that drop it in MySQL and on ClickHouse side and recreate it (create with the same table name and schema but different column data type) in some cases it leads to table schema conflict in debezium and crash it till manual recovery.

Some logs:
WorkerSourceTask{id=test-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
debezium | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:330)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:356)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:258)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
debezium | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
debezium | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
debezium | at java.base/java.lang.Thread.run(Thread.java:829)
debezium | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic SERVER5432.test.users1 :
debezium | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
debezium | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:330)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
debezium | ... 11 more
debezium | Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"Envelope","namespace":"SERVER5432.test.users1","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"SERVER5432.test.users1.Envelope"}
debezium | at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
debezium | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
debezium | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
debezium | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
debezium | ... 15 more
debezium | Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "SERVER5432.test.users1-value", details: [Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/0/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}, Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/1/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}] io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleSchemaException: Schema being registered is incompatible with an earlier schema for subject "SERVER5432.test.users1-value", details: [Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/0/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}, Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/1/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}]
debezium | io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleSchemaException: Schema being registered is incompatible with an earlier schema for subject "SERVER5432.test.users1-value", details: [Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/0/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}, Incompatibility{type:MISSING_UNION_BRANCH, location:/fields/1/type/1, message:reader union lacking writer type: RECORD, reader:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"int"}],"connect.name":"SERVER5432.test.users1.Value"}], writer:["null",{"type":"record","name":"Value","namespace":"SERVER5432.test.users1","fields":[{"name":"id","type":"int"},{"name":"MyData","type":"string"}],"connect.name":"SERVER5432.test.users1.Value"}]}]
debezium | at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:133)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.