itinycheng / flink-connector-clickhouse Goto Github PK
View Code? Open in Web Editor NEWFlink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
License: Apache License 2.0
Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
License: Apache License 2.0
Since clickhouse does not support transactions, how to ensure exactly-once semantics when flink writes?
执行upsert 模式 报错,具体信息如下:
160123 [clickhouse-batch-output-format-thread-1] ERROR org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor - ClickHouse executeBatch error, retry times = 0
java.sql.SQLSyntaxErrorException: Query must be like 'INSERT INTO [db.]table [(c1, c2, c3)] VALUES (?, ?, ?)'. Got: ALTER TABLE default
.order
UPDATE userName
=?, orderID
=? WHERE orderName
=?
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:327)
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:320)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:53)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
请问CLICKHOUSE 有dialect吗
请问此连接器能实现clickhouse cdc么,监听clickhouse数据变化
create table ods_mall_info (
id BIGINT,
name STRING,
shortName STRING,
mallType TINYINT,
phone STRING,
provinceCode STRING,
cityCode STRING,
countyCode STRING,
address STRING,
longitude Decimal(18,6),
latitude Decimal(18,6),
state STRING,
createTime TIMESTAMP,
createBy BIGINT,
image STRING,
codeImageCreateTime TIMESTAMP,
codeImageState STRING,
codeImage STRING,
documentType STRING,
socialCreditCode STRING,
licenseEffectiveDate STRING,
businessScope STRING,
ICPNumber STRING,
webAddress STRING,
registeredCaptial BIGINT,
paidCaptial BIGINT,
foundedDate STRING,
industry STRING,
registeredAddress STRING,
businessAddress STRING,
socialCreditImage STRING,
corporateName STRING,
corporatePhone STRING,
corporateDocumentType STRING,
corporateDocumentNumber STRING,
documentStartEndDate STRING,
documentFrontImage STRING,
documentBackImage STRING,
oms_code STRING,
coefficient STRING,
updateTime TIMESTAMP,
updateBy BIGINT,
delFlag STRING,
codeImageRecharge STRING,
mallDistinction STRING,
showDefaultPic TINYINT,
mall_dis STRING,
real_business TINYINT
) WITH (
'connector' ='clickhouse',
'url' ='clickhouse://:8123',
'database-name' ='hxhdbtest',
'table-name' = 'ods_mall_info',
'username' = 'default',
'password' = ''
);
create table ods_business_lymerch_detail
(
id BIGINT,
mall_guid String,
mall_mdmid String,
mall_class String,
mall_code String,
mall_status String,
mall_type String,
address String,
area String,
bi_code String,
b_reg_code String,
b_reg_name String,
city_code String,
city_name String,
consumer_hotline String,
country_code String,
country_name String,
full_name String,
gd_gps String,
gps String,
is_open_up TINYINT,
is_up_app TINYINT,
is_up_ly TINYINT,
is_up_sap TINYINT,
oms_code String,
open_date String,
operation_name String,
post_code String,
province_code String,
prv_name String,
ps_code String,
short_name String,
s_reg_code String,
telephone String,
composite_merch String,
del_flag TINYINT,
create_date_time timestamp
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://:8123',
'database-name' = 'hxhdbtest',
'table-name' = 'ods_business_lymerch_detail',
'username' = 'default',
'password' = ''
);
CREATE TABLE ods_mall_city
(
Region_Identifier string,
Chinese_Abbreviation string,
Area_Code string,
Full_Name_Pinyin string,
Full_Name string,
Area_Name string,
Superior_Area_Code string,
Pinyin string,
AreaType string
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://:8123',
'database-name' = 'hxhdbtest',
'table-name' = 'ods_mall_city',
'username' = 'default',
'password' = ''
);
CREATE TABLE dim_mall
(
hxh_mall_id String,
hxh_mall_name String,
hxh_mall_short_name String,
mall_address String,
mall_type INT,
mall_kind String,
mall_area String,
open_date String,
oms_code String,
b_reg_code String,
b_reg_name String,
bi_code String,
ps_code String,
province_code String,
province_name String,
city_code String,
city_name String,
country_code String,
country_name String,
consumer_hotline String,
mall_longitude Decimal(18, 6),
mall_latitude Decimal(18, 6),
city_longitude Decimal(18, 6),
city_latitude Decimal(18, 6),
gd_gps String,
tecent_gps String,
hxh_mall_status String,
mall_industry String,
mall_distinction String,
ly_mall_id String,
ly_mall_guid String,
ly_mall_mdmid String,
ly_mall_code String,
ly_mall_class String,
ly_mall_name String,
state String,
ori_create_time TIMESTAMP(0),
ori_update_time TIMESTAMP(0),
dim_create_date TIMESTAMP(0),
PRIMARY KEY (hxh_mall_id) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://:8123',
'database-name' = 'hxhdbtest',
'table-name' = 'dim_mall',
'username' = 'default',
'password' = ''
);
INSERT
INTO
dim_mall
SELECT
cast(mi.id AS VARCHAR),
-- 商场代码
mi.name AS hxh_mall_name,
-- 商场名称
mi.shortName AS hxh_mall_short_name,
-- 商场简称
bld.address AS mall_address,
-- 地址
mi.mallType AS mall_type,
-- 商场类型
bld.mall_type AS mall_kind,
-- 商场类别
bld.area AS mall_area,
-- 商场面积
bld.open_date AS open_date,
-- 开业日期
mi.oms_code AS oms_cod,
-- 龙翼商场关联码
bld.b_reg_code AS b_reg_code,
-- 营发中心编号
bld.b_reg_name AS b_reg_name,
-- 营发中心名称
bld.bi_code AS bi_code,
-- BI代码
bld.ps_code AS ps_code,
-- PS代码
mi.provinceCode AS province_code,
-- 省代码
oc_province.Area_Name AS province_name,
-- 省名称
mi.cityCode AS city_code,
-- 城市编码
oc_city.Area_Name AS city_name,
-- 城市名称
mi.countyCode AS country_code,
-- 市区代码
oc_county.Area_Name AS country_name,
-- 市辖区名
bld.consumer_hotline AS consumer_hotline,
-- 消费热线
mi.longitude AS longitude,
-- 商场经度
mi.latitude AS latitude,
0 AS mall_longitude,
0 AS mall_latitude,
-- 商场纬度
bld.gd_gps AS gd_gps,
-- 高德地图坐标
bld.gps AS tecent_gps,
-- 腾讯地图坐标
mi.state AS hxh_mall_status,
-- 商场状态
mi.industry AS mall_industry,
-- 所属行业
mi.mallDistinction AS mall_distinction,
-- 商场标识 1-显形商场 2-隐形商场
cast(bld.id AS VARCHAR) AS ly_mall_id,
-- 龙翼商场代码
bld.mall_guid AS ly_mall_guid,
-- 龙翼商场guid
bld.mall_mdmid AS ly_mall_mdmid,
-- 龙翼商场mdmid
bld.mall_code AS ly_mall_code,
-- 龙翼商场编号
bld.mall_class AS ly_mall_class,
-- 龙翼商场名字
bld.full_name as ly_mall_name,
-- 商户进件状态(1:可用)
bld.mall_status,
mi.createTime AS business_create_date,
mi.updateTime AS update_create_date,
now()
FROM
ods_mall_info mi
LEFT JOIN (
SELECT
*
FROM
ods_business_lymerch_detail
WHERE
del_flag = 0
) bld ON
mi.oms_code = bld.oms_code
LEFT JOIN (
SELECT
Area_Code,
Area_Name
FROM
ods_mall_city
WHERE
AreaType = '2'
) oc_province ON
oc_province.Area_Code = mi.provinceCode
LEFT JOIN (
SELECT
Area_Code,
Area_Name
FROM
ods_mall_city
WHERE
AreaType = '3'
) oc_city ON
oc_city.Area_Code = mi.cityCode
LEFT JOIN (
SELECT
Area_Code,
Area_Name
FROM
ods_mall_city
WHERE
AreaType = '4'
) oc_county ON
oc_county.Area_Code = mi.countyCode;
上面的是原始SQL,left join之后的数量确实变少了,ods_mall_info有40条数据,而ods_business_lymerch_detail只有11条数据,最终得到的是29条(除了右表的11条数据)。如果insert表改成 connector='print',能够明显看到有那11条数据的+ - + 的三步操作。
当我在insert表上增加sink.batch-size = ‘1’的时候,数据没丢失
sqlserver sink Clickhouse 数据无法下沉 也没有报错,在ClickHouseBatchOutputFormat 类的 writeRecord 打上断点发现数据已经收到了,如果断点调试数据确能够sink ,不打断点数据就无法sink
例如:
clickhouse://host1:8123,host2:8123,host3:8123
insert into select a left join b on ...
得到的数据,丢失了满足on条件的数据,只有不满足条件的数据
请问在哪个远程仓库,可以拉到你的jar包呢?
你好,下载代码测试,发现Source在不设置scan.partition.*等参数时,多并行度下,读取了N份数据出来。能如何解决这个问题呢?谢谢!
Flink Version : 1.13.2
Flink CDC version: 2.2
flink-connector-clickhouse version : 1.13-release
The workflow as below:
MySQL (8.0.28) -> Flink CDC -> Flink -> ClickHouse.
(1) Configuration :
Use local table (engine is MergeTree) for clickhouse sink table, configuration as below:
" 'connector' = 'clickhouse'," +
" 'url' = 'clickhouse://clickhouse-server:8123',"+
" 'username' = 'user'"+
" 'password' = 'pass'"+
" 'database-name' = 'database'" +
" 'table-name' = 'clickhouse_sink_table'" +
" 'sink.batch-size' = '500'" +
" 'sink.flush-interval' = '1000'" +
" 'sink.max-retries' = '3'" +
(2) Relevant Flink Java SQL code:
String sql =
"INSERT INTO clickhouse_sink_table SELECT vrfi.*, DATE_FORMAT(vrfi.created, 'yyyy-MM') FROM mysql_source_table as vrfi\n";
statementSet.addInsertSql(sql);
The table in MySQL has '539905' rows:
select count(id) from sbtest1;
+-----------+
| count(id) |
+-----------+
| 539905 |
+-----------+
1 row in set (0.05 sec)
Rows for ClickHouse sink table:
_SELECT COUNT(id) FROM clickhouse_sink_table;
SELECT COUNT(id)
FROM clickhouse_sink_table
Query id: 775dd633-9a8b-4c13-b272-3b317f0575ab
┌─count(id)─┐
│ 556114 │
└───────────┘_
SELECT COUNTDistinct(id)
FROM clickhouse_sink_table
Query id: 3e6dc971-47e0-4ca0-836b-eaabf280d2e2
┌─uniqExact(id)─┐
│ 539905 │
└───────────────┘_
Thanks for your help.
in my opinion, ClickHouseBatchExecutor is used in append-only scenario, but in fact, it also deal with update event
when I use flink-connector-clickhouse.
The following exceptions occur occasionally
2022-05-09 12:26:29,777 ERROR org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat [] - ClickHouse executeBatch error, retry times = 0 ru.yandex.clickhouse.except.ClickHouseUnknownException: ClickHouse exception, code: 1002, host: clickhouse-prod.xxx.com, port: 8123; null at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.getException(ClickHouseExceptionSpecifier.java:92) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:42) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:25) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1071) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1026) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1019) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:381) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:364) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseBatchExecutor.executeBatch(ClickHouseBatchExecutor.java:72) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.attemptFlush(AbstractClickHouseOutputFormat.java:84) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:66) ~[flink-connector-clickhouse-1.12.0.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_322] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_322] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_322] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_322] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] Caused by: java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911) ~[?:1.8.0_322] at java.util.ArrayList$Itr.next(ArrayList.java:861) ~[?:1.8.0_322] at ru.yandex.clickhouse.ClickHousePreparedStatementImpl$BatchHttpEntity.writeTo(ClickHousePreparedStatementImpl.java:419) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl$WrappedHttpEntity.writeTo(ClickHouseStatementImpl.java:98) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:152) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1059) ~[flink-connector-clickhouse-1.12.0.jar:?] ... 15 more
in fact, at UTC+8 time zone, should return input arg - 8 hour
In initail mode,the data was synchronized to the Clickhouse, but an error caused the task to fail
Exception in thread "main" org.apache.flink.table.api.TableException: Failed to wait job finish
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
at com.demo.mysql.v2.CDCMain.main(CDCMain.java:89)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:60)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:60)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:77)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:16)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:50)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at StreamExecCalc$7.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:157)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:139)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:156)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:72)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:76)
... 10 more
Caused by: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:67)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
... 10 more
您好,请教一个问题哈,我可以实时查询clickhouse的数据吗?比如我现在有一个表,实时往里面写,然后另一个程序会实时查询这个表今天凌晨到目前为止的所有数据,比如2000万,然后在去和流的数据进行匹配?
用的分支1.12,编译后放入工程里,执行sql,校验报错,最简单的sql,select name from student
1.13和1.14同样的代码执行无问题。
跟了下代码:SqlNode parsed = parser.parse(statement);这里出错了,parserImpl.class里
1.12有bug吗?
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. null
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
at com.big.data.app.test4.main(test4.java:37)
Caused by: java.lang.UnsupportedOperationException
at org.apache.flink.connector.clickhouse.catalog.ClickHouseCatalog.getTableStatistics(ClickHouseCatalog.java:452)
at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:117)
at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:104)
at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:81)
at java.util.Optional.map(Optional.java:215)
at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:77)
at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3205)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3187)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3461)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
Any connect options to support limit the data pulling when job start ?
你好,请问一下,flink sql sink到clickhouse中,clickhouse建表支持flink sql中ROW数据类型映射到clickhouse的JSON数据类型吗?
你好,我在写入的时候,流任务跑了1个小时后,发现batch 刷新数据有问题,重试3次都失败了。下面是日志:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:77)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:16)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at StreamExecCalc$149.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:76)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:67)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
... 9 more
I want use flink cdc sql create a wide table(sink_ck) in clickhouse, when I create two or more then two flink cdc source table from mysql , such as tb1 ,tb2.
I submit to a job in flink-client :
insert into sink_ck SELECT a.*,b.order_name FROM tb1 a LEFT JOIN tb2 b ON a.order_id = b.order_id
when tb1 row datas is update in mysql , the row will be delete in sink_ck , why ?
I want tb1 row datas is update in mysql ,this row datas same update in clikchouse.
one table use flink cdc sink to clickhouse , insert \ update\ delete is No problem!
one table update is ok, message :
INSERT INTO ck_order
(id
, order_id
, merchant_code
, merchant_name
, create_time
, update_time
) FORMAT TabSeparated
INSERT INTO ck_order
(id
, order_id
, merchant_code
, merchant_name
, create_time
, update_time
) FORMAT TabSeparated
ALTER TABLE flink
.ck_order
UPDATE order_id
='20042100002', merchant_code
='BU0000000002YGP', merchant_name
='广州2贸易有限公司_2', create_time
='2021-04-21 10:41:22', update_time
='2022-04-25 10:33:58' WHERE id
=2
ALTER TABLE flink
.ck_order
UPDATE order_id
='20042100002', merchant_code
='BU0000000002YGP', merchant_name
='广州2贸易有限公司_2', create_time
='2021-04-21 10:41:22', update_time
='2022-04-25 10:33:58' WHERE id
=2
wide table update ,it is delete, no alter table update:
INSERT INTO ck_order_detail
(id
, order_id
, merchant_code
, merchant_name
, create_time
, update_time
, order_name
) FORMAT TabSeparated
INSERT INTO ck_order_detail
(id
, order_id
, merchant_code
, merchant_name
, create_time
, update_time
, order_name
) FORMAT TabSeparated
ALTER TABLE flink
.ck_order_detail
DELETE WHERE id
=2
ALTER TABLE flink
.ck_order_detail
DELETE WHERE id
=2
How to solve the ClickHouse distributed write problem?
The following error occurs when I use a regular statement:
Code: 48, e.displayText() = DB::Exception: There was an error on [chi-bpiih-clickhouse-gxqtcluster-0-0:9000]: Code: 48, e.displayText() = DB::Exception: Mutations are not supported by storage Distributed (version 21.1.3.32) (version 21.1.3.32)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:59) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:29) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1094) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:773) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeStatement(ClickHouseStatementImpl.java:255) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeBatch(ClickHouseStatementImpl.java:593) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:388) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:364) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:53) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
CREATE TABLE source (
`id` int,
name varchar,
t time
) WITH (
'connector' = 'datagen'
);
CREATE TABLE sink (
`id` int,
name varchar,
t time ,
primary key(id) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:clickhouse://url.163.org:8123/',
'database-name' = 'lyblocal',
'username' = 'default',
'password' = '',
'table-name' = 'mt3',
'sink.distribute-table-write-local' = 'false',
'sink.ignore-delete' = 'false',
'sink.partition-strategy' = 'hash',
'sink.partition-key' = 'id',
'sink.buffer-flush.max-rows' = '1'
);
insert into sink
select id,name,t from source;
报错如下:
Column 0, name: id, type: Int32, parsed text: "-816858032"
Column 1, name: name, type: String, parsed text: "9fb989b731b7d0f32d66dee746308ee77eb311db99a220adc78817b43e23c36ae32975e02834ec1f54308fd10d7f60f8c2fb"
Column 2, name: t, type: DateTime, parsed text: "17"ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.
Code: 41, e.displayText() = DB::ParsingException: Cannot parse datetime
mysql数据 slink clickhouse 时间差了8小时,有什么参数可以设置吗
mysql的源数据进行更新一条数据,ck库会把mysql更新的那条数据进行删除,不会新增一条更新后的新数据。
sink.ignore-delete =false
issues java.lang.UnsupportedOperationException: Please use prepareStatement(ClickHouseConnection connection)
instead.
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.prepareStatement(ClickHouseUpsertExecutor.java:63)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.open(ClickHouseBatchOutputFormat.java:63)
... 13 more
Flink 版本 1.14.4 CDC版本2.2.1 Clickhouse jar包是最新的1.14.3分支的,单表同步过一阵子就会报java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:77)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:16)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:50)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:157)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:139)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:72)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$25(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
I want use it,but Maven Dependency not found.Maven repository url is ?
ClickHouseExecutor#attemptExecuteBatch
default void attemptExecuteBatch(ClickHousePreparedStatement stmt, int maxRetries)
throws SQLException {
for (int i = 0; i < maxRetries; i++) {
try {
stmt.executeBatch();
return;
} catch (Exception exception) {
LOG.error("ClickHouse executeBatch error, retry times = {}", i, exception);
try {
Thread.sleep(1000L * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new SQLException(
"Unable to flush; interrupted while doing another attempt", ex);
}
}
}
throw new SQLException(
String.format(
"Attempt to execute batch failed, exhausted retry times = %d", maxRetries));
}
从报错日志我们只能查看到重试三次,同时任务会一直内部重启,从业务角度来看,他们一般只会去看 flink ui excetion。
When I use sqlserver CDC as the source, it is normal to insert and delete records in the sqlserver table. When the update record is abnormal, I exit the job.
CREATE TABLE ch_user (
id
int NOT NULL,
name
varchar(50),
comment
varchar(255),
create_time
timestamp,
PRIMARY KEY (id
) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://xxxxx:8123',
'database-name' = 'default',
'table-name' = 'user',
'username' = 'default',
'password' = '123456',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
2022-03-03 15:14:48
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:77)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:16)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:252)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:237)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:163)
at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:446)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:72)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:76)
... 9 more
Caused by: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:67)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
... 9 more
update data, Distributed table use-local=true have errors, but local table no errors
insert data , Distributed table use-local=true and local table also no errors
e.displayText() = DB::Exception: There was an error on Cannot execute replicated DDL query on leader
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:59)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:29)
at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1094)
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:773)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeStatement(ClickHouseStatementImpl.java:255)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeBatch(ClickHouseStatementImpl.java:593)
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:388)
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:364)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:53)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
at org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat.flush(ClickHouseShardOutputFormat.java:160)
at org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat.flush(ClickHouseShardOutputFormat.java:154)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
1.报错Unable to create a sink for writing table 'default_catalog.default_database.ck'
2.报错java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/CatalogTable;
3.maven无法拉取,改为手动下载1.12版本
4.flink版本为1.13.6
ck配置为:
'connector'='clickhouse'
'database-name'='XX'
'sink.batch-size'='500'
'sink.flush-interval'='1000'
'sink.max-retries'='3'
'table-name'='ck'
'url'='clickhouse://XX:9876'
希望得到帮助
Got below error message.
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/runtime/util/ExecutorThreadFactory
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.scheduledFlush(AbstractClickHouseOutputFormat.java:59)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.open(ClickHouseBatchOutputFormat.java:68)
It seems org/apache/flink/runtime/util/ExecutorThreadFactory has been removed from Flink 1.14.0. Do you have a plan to upgrade to Flink 1.14.0?
Caused by: org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; try restarting transaction Error code: 1205; SQLSTATE: 40001.
假如有以下链路:
mysql-cdc(debezium-json) -> clickhouse
debezium-json会将cdc变更日志中的更新操作,转化为2条操作(-Delete +Insert)
当这个事件流通过flink-clickhouse-connector实时写入clickhouse时,因为-Delete事件触发的删除操作是异步的,会导致执行顺序错乱,比如会先执行+Insert操作,再执行-Delete操作,
导致这种场景下会出现数据问题,表现在:当有更新操作时,最终体现在clickhouse会表现为删除操作。
1.目前项目中的:ru.yandex.clickhouse.ClickHouseDriver
最新的为:com.clickhouse.jdbc.ClickHouseDriver
官方建议修改到新版。https://github.com/ClickHouse/clickhouse-jdbc
2.java版本是否考虑升级到11或者17(纯问题,无动机)
sql:
CREATE TABLE sink_table (
ID String,
NAME String,
PRIMARY KEY (ID) NOT ENFORCED) WITH (
'connector' = 'clickhouse',
'sink.batch-size' = '500',
'sink.ignore-delete' = 'false',
'sink.flush-interval' = '1000',
'sink.max-retries' = '300',
'use-local' = 'true',
'url' = 'clickhouse://127.0.0.1:8123',
'database-name' = 'default',
'username' = 'clickhouse',
'password' = 'clickhouse',
'table-name' = 'demo' )
error:
java.io.IOException: Unable to establish connection to ClickHouse
at org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:106)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
Caused by: ru.yandex.clickhouse.except.ClickHouseUnknownException: ClickHouse exception, code: 1002, host: 127.0.0.1, port: 9000; Magic is not correct: 60
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.getException(ClickHouseExceptionSpecifier.java:92)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:56)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:25)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:351)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:324)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:319)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:314)
at ru.yandex.clickhouse.ClickHouseConnectionImpl.initConnection(ClickHouseConnectionImpl.java:91)
at ru.yandex.clickhouse.ClickHouseConnectionImpl.(ClickHouseConnectionImpl.java:78)
at ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:62)
at ru.yandex.clickhouse.BalancedClickhouseDataSource.getConnection(BalancedClickhouseDataSource.java:195)
at org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider.createConnection(ClickHouseConnectionProvider.java:129)
at org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider.createAndStoreShardConnection(ClickHouseConnectionProvider.java:95)
at org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider.createShardConnections(ClickHouseConnectionProvider.java:82)
at org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:85)
... 14 more
Caused by: java.io.IOException: Magic is not correct: 60
at ru.yandex.clickhouse.response.ClickHouseLZ4Stream.readNextBlock(ClickHouseLZ4Stream.java:94)
at ru.yandex.clickhouse.response.ClickHouseLZ4Stream.checkNext(ClickHouseLZ4Stream.java:75)
at ru.yandex.clickhouse.response.ClickHouseLZ4Stream.read(ClickHouseLZ4Stream.java:51)
at ru.yandex.clickhouse.response.StreamSplitter.readFromStream(StreamSplitter.java:92)
at ru.yandex.clickhouse.response.StreamSplitter.next(StreamSplitter.java:53)
at ru.yandex.clickhouse.response.ClickHouseResultSet.(ClickHouseResultSet.java:95)
at ru.yandex.clickhouse.ClickHouseStatementImpl.createResultSet(ClickHouseStatementImpl.java:1121)
at ru.yandex.clickhouse.ClickHouseStatementImpl.updateResult(ClickHouseStatementImpl.java:224)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:344)
... 25 more
ClickHouse Source is only supported by Flink 1.13(or higher) in your project, is there any way support ClickHouse Source in Flink 1.12?
设置
'sink.batch-size' = '100',
'sink.flush-interval' = '5000'
数据量较小,160条数据,只sink了部分数据,即使过了sink.flush-interval这个周期,也不会把剩余数据sink进去
In https://issues.apache.org/jira/browse/FLINK-24253 internal jdbc dialect classes have been made public.
It makes sense to refactor the code to extend them. However it requires upgrade to 1.15.x (should be released in Feb 2022)
I would be happy to help here, however first I would like to know if there are any blockers for such an update?
需要在pom添加如下引用。但不知道为啥,作者知道么
<dependency> <groupId>com.google.errorprone</groupId> <artifactId>javac</artifactId> <version>9+181-r4173-1</version> </dependency>
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:259)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:244)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:162)
at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:444)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at com.ververica.cdc.connectors.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:76)
at com.ververica.cdc.connectors.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:94)
at com.ververica.cdc.connectors.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at com.ververica.cdc.connectors.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:67)
mysql: connector' = 'mysql-cdc'
clickhouse: your package
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.20.187',
'port' = '3309',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
CREATE TABLE clickhouseorders (
order_id
INT,
order_date
TIMESTAMP(0),
customer_name
STRING,
price
DECIMAL(10, 5),
PRIMARY KEY (order_id
) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://192.168.20.187:8123',
'database-name' = 'test',
'table-name' = 'clickhouseorders',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
3.
INSERT INTO clickhouseorders
SELECT o.order_id,o.order_date, o.customer_name, o.price
FROM orders AS o;
result:
2022-02-14 14:42:44
java.lang.NoClassDefFoundError: org/apache/flink/util/concurrent/ExecutorThreadFactory
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.scheduledFlush(AbstractClickHouseOutputFormat.java:52)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.open(ClickHouseBatchOutputFormat.java:69)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
what should I do next? thanks
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.