Coder Social home page Coder Social logo

flink-zhisheng / flink-connector-clickhouse Goto Github PK

View Code? Open in Web Editor NEW

This project forked from itinycheng/flink-connector-clickhouse

1.0 1.0 0.0 93 KB

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and writing primary data, maps, arrays to clickhouse.

License: Apache License 2.0

Java 100.00%

flink-connector-clickhouse's Introduction

Flink ClickHouse Connector

Flink SQL connector for ClickHouse database, this project Powered by ClickHouse JDBC.

The original code comes from AliYun. On this basis, I have done some bug fixes, code optimizations and more data type support. Currently the project only supports Sink Table, the Source Table will be implemented in the future.

Connector Options

Option Required Default Type Description
url required none String The ClickHouse jdbc url in format clickhouse://<host>:<port>.
username optional none String The 'username' and 'password' must both be specified if any of them is specified.
password optional none String The ClickHouse password.
database-name optional default String The ClickHouse database name.
table-name required none String The ClickHouse table name.
sink.batch-size optional 1000 Integer The max flush size, over this will flush data.
sink.flush-interval optional 1s Duration Over this flush interval mills, asynchronous threads will flush data.
sink.max-retries optional 3 Integer The max retry times when writing records to the database failed.
sink.write-local optional false Boolean Directly write data to local tables in case of distributed table.
sink.partition-strategy optional balanced String Partition strategy: balanced(round-robin), hash(partition key), shuffle(random).
sink.partition-key optional none String Partition key used for hash strategy.
sink.ignore-delete optional true Boolean Whether to ignore delete statements.

Data Type Mapping

Flink Type ClickHouse Type (Sink) ClickHouse Type (Source)
CHAR String
VARCHAR String / IP / UUID
STRING String / Enum
BOOLEAN UInt8
BYTES FixedString
DECIMAL Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256
TINYINT Int8
SMALLINT Int16 / UInt8
INTEGER Int32 / UInt16 / Interval
BIGINT Int64 / UInt32
FLOAT Float32
DOUBLE Float64
DATE Date
TIME DateTime
TIMESTAMP DateTime
TIMESTAMP_LTZ DateTime
INTERVAL_YEAR_MONTH Int32
INTERVAL_DAY_TIME Int64
ARRAY Array
MAP Map
ROW Not supported
MULTISET Not supported
RAW Not supported

Maven Dependency

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-clickhouse</artifactId>
    <version>1.12.0-SNAPSHOT</version>
</dependency>

How to use

Create and use sink table

-- register a clickhouse table `t_user` in flink sql.
CREATE TABLE t_user (
    `user_id` BIGINT,
    `user_type` INTEGER,
    `language` STRING,
    `country` STRING,
    `gender` STRING,
    `score` DOUBLE,
    `list` ARRAY<STRING>,
    `map` Map<STRING, BIGINT>,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://{ip}:{port}',
    'database-name' = 'tutorial',
    'table-name' = 'users',
    'sink.batch-size' = '500',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3'
);

-- write data into the clickhouse table from the table `T`
INSERT INTO t_user
SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP<STRING, BIGINT>) FROM T;

Create and use ClickHouseCatalog

val tEnv = TableEnvironment.create(setting)

val props = new util.HashMap[String, String]()
props.put(ClickHouseConfig.DATABASE_NAME, "default")
props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
props.put(ClickHouseConfig.USERNAME, "username")
props.put(ClickHouseConfig.PASSWORD, "password")
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s")
val cHcatalog = new ClickHouseCatalog("clickhouse", props)
tEnv.registerCatalog("clickhouse", cHcatalog)
tEnv.useCatalog("clickhouse")

tableEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");

Roadmap

  • Implement the Flink SQL Sink function.
  • Support array and Map types.
  • Support ClickHouseCatalog.
  • Implement the Flink SQL Source function.

flink-connector-clickhouse's People

Contributors

itinycheng avatar

Stargazers

 avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.