Coder Social home page Coder Social logo

starrocks-connector-for-apache-flink's Introduction

starrocks-connector-for-apache-flink

This is connector for Apache Flink®

Prerequisites

<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for Apache Flink® 1.15 -->
    <version>x.x.x_flink-1.15</version>
    <!-- for Apache Flink® 1.14 -->
    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>
    <!-- for Apache Flink® 1.13 -->
    <version>x.x.x_flink-1.13_2.11</version>
    <version>x.x.x_flink-1.13_2.12</version>
    <!-- for Apache Flink® 1.12 -->
    <version>x.x.x_flink-1.12_2.11</version>
    <version>x.x.x_flink-1.12_2.12</version>
    <!-- for Apache Flink® 1.11 -->
    <version>x.x.x_flink-1.11_2.11</version>
    <version>x.x.x_flink-1.11_2.12</version>
</dependency>

Click HERE to get the latest version.

Apache Flink® source

StarRocksSourceOptions options = StarRocksSourceOptions.builder()
    .withProperty("scan-url", "fe_ip1:8030,fe_ip2:8030,fe_ip3:8030")
    .withProperty("jdbc-url", "jdbc:mysql://fe_ip:9030")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("table-name", "flink_test")
    .withProperty("database-name", "test")
    .build();
TableSchema tableSchema = TableSchema.builder()
    .field("date_1", DataTypes.DATE())
    .field("datetime_1", DataTypes.TIMESTAMP(6))
    .field("char_1", DataTypes.CHAR(20))
    .field("varchar_1", DataTypes.STRING())
    .field("boolean_1", DataTypes.BOOLEAN())
    .field("tinyint_1", DataTypes.TINYINT())
    .field("smallint_1", DataTypes.SMALLINT())
    .field("int_1", DataTypes.INT())
    .field("bigint_1", DataTypes.BIGINT())
    .field("largeint_1", DataTypes.STRING())
    .field("float_1", DataTypes.FLOAT())
    .field("double_1", DataTypes.DOUBLE())
    .field("decimal_1", DataTypes.DECIMAL(27, 9))
    .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(StarRocksSource.source(options, tableSchema)).setParallelism(5).print();
env.execute("StarRocks flink source");

OR

// create a table with `structure` and `properties`
CREATE TABLE flink_test (
    date_1 DATE,
    datetime_1 TIMESTAMP(6),
    char_1 CHAR(20),
    varchar_1 VARCHAR,
    boolean_1 BOOLEAN,
    tinyint_1 TINYINT,
    smallint_1 SMALLINT,
    int_1 INT,
    bigint_1 BIGINT,
    largeint_1 STRING,
    float_1 FLOAT,
    double_1 DOUBLE,
    decimal_1 DECIMAL(27,9)
) WITH (
   'connector'='starrocks',
   'scan-url'='fe_ip1:8030,fe_ip2:8030,fe_ip3:8030',
   'jdbc-url'='jdbc:mysql://fe_ip:9030',
   'username'='root',
   'password'='',
   'database-name'='flink_test',
   'table-name'='flink_test'
);

select date_1, smallint_1 from flink_test where char_1 <> 'A' and int_1 = -126

Source options

Option Required Default Type Description
connector YES NONE String starrocks
scan-url YES NONE String Hosts of the fe nodes like: fe_ip1:http_port,fe_ip2:http_port....
jdbc-url YES NONE String Hosts of the fe nodes like: fe_ip1:query_port,fe_ip2: query_port....
username YES NONE String StarRocks user name.
password YES NONE String StarRocks user password.
database-name YES NONE String Database name
table-name YES NONE String Table name
scan.connect.timeout-ms NO 1000 String Connect timeout
scan.params.keep-alive-min NO 10 String Max keep alive time min
scan.params.query-timeout-s NO 600(5min) String Query timeout for a single query(The value of this parameter needs to be longer than the estimated period of the source)
scan.params.mem-limit-byte NO 102410241024(1G) String Memory limit for a single query
scan.max-retries NO 1 String Max request retry times.

Source metrics

Name Type Description
totalScannedRows counter successfully collected data

Source type mappings

StarRocks Flink
NULL NULL
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
LARGEINT STRING
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
DATETIME TIMESTAMP
DECIMAL DECIMAL
DECIMALV2 DECIMAL
DECIMAL32 DECIMAL
DECIMAL64 DECIMAL
DECIMAL128 DECIMAL
CHAR CHAR
VARCHAR STRING

Source tips

  1. exactly-once semantic cannot be guaranteed in the case of a task failure.
  2. Only SQLs without aggregation like select {*|columns|count(1)} from {table-name} where ... are supported.

Flink sink

// -------- sink with raw json string stream --------
fromElements(new String[]{
    "{\"score\": \"99\", \"name\": \"stephen\"}",
    "{\"score\": \"100\", \"name\": \"lebron\"}"
}).addSink(
    StarRocksSink.sink(
        // the sink options
        StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", "jdbc:mysql://ip:port,ip:port?xxxxx")
            .withProperty("load-url", "ip:port;ip:port")
            .withProperty("username", "xxx")
            .withProperty("password", "xxx")
            .withProperty("table-name", "xxx")
            .withProperty("database-name", "xxx")
            .withProperty("sink.properties.format", "json")
            .withProperty("sink.properties.strip_outer_array", "true")
            .withProperty("sink.parallelism", "1")
            .build()
    )
);


// -------- sink with stream transformation --------
class RowData {
    public int score;
    public String name;
    public RowData(int score, String name) {
        ......
    }
}
fromElements(
    new RowData[]{
        new RowData(99, "stephen"),
        new RowData(100, "lebron")
    }
).addSink(
    StarRocksSink.sink(
        // the table structure
        TableSchema.builder()
            .field("score", DataTypes.INT())
            .field("name", DataTypes.VARCHAR(20))
            .build(),
        // the sink options
        StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", "jdbc:mysql://ip:port,ip:port?xxxxx")
            .withProperty("load-url", "ip:port;ip:port")
            .withProperty("username", "xxx")
            .withProperty("password", "xxx")
            .withProperty("table-name", "xxx")
            .withProperty("database-name", "xxx")
            .withProperty("sink.properties.format", "json")
            .withProperty("sink.properties.strip_outer_array", "true")
            .withProperty("sink.parallelism", "1")
            .build(),
        // set the slots with streamRowData
        (slots, streamRowData) -> {
            slots[0] = streamRowData.score;
            slots[1] = streamRowData.name;
        }
    )
);

OR

// create a table with `structure` and `properties`
tEnv.executeSql(
    "CREATE TABLE USER_RESULT(" +
        "name VARCHAR," +
        "score BIGINT" +
    ") WITH ( " +
        "'connector' = 'starrocks'," +
        "'jdbc-url'='jdbc:mysql://ip:port,ip:port?xxxxx'," +
        "'load-url'='ip:port;ip:port'," +
        "'database-name' = 'xxx'," +
        "'table-name' = 'xxx'," +
        "'username' = 'xxx'," +
        "'password' = 'xxx'," +
        "'sink.buffer-flush.interval-ms' = '15000'," +
        "'sink.properties.format' = 'json'," +
        "'sink.properties.strip_outer_array' = 'true'," +
        "'sink.parallelism' = '1'," +
        "'sink.max-retries' = '10'," +
    ")"
);

Sink options

Option Required Default Type Description
connector YES NONE String starrocks
jdbc-url YES NONE String this will be used to execute queries in starrocks.
load-url YES NONE String fe_ip:http_port;fe_ip:http_port separated with ;, which would be used to do the batch sinking.
database-name YES NONE String starrocks database name
table-name YES NONE String starrocks table name
username YES NONE String starrocks connecting username
password YES NONE String starrocks connecting password
sink.semantic NO at-least-once String at-least-once or exactly-once(flush at checkpoint only and options like sink.buffer-flush.* won't work either).
sink.buffer-flush.max-bytes NO 94371840(90M) String the max batching size of the serialized data, range: [64MB, 10GB].
sink.buffer-flush.max-rows NO 500000 String the max batching rows, range: [64,000, 5000,000].
sink.buffer-flush.interval-ms NO 300000 String the flushing time interval, range: [1000ms, 3600000ms].
sink.max-retries NO 3 String max retry times of the stream load request, range: [0, 1000].
sink.parallelism NO NULL String Specify the parallelism of the sink individually. Remove it if you want to follow the global parallelism settings.
sink.connect.timeout-ms NO 1000 String Timeout in millisecond for connecting to the load-url, range: [100, 60000].
sink.label-prefix NO NO String the prefix of the stream load label, available characters are within [-_A-Za-z0-9].
sink.properties.* NO NONE String the stream load properties like 'sink.properties.columns' = 'k1, v1'.

Sink metrics

Name Type Description
totalFlushBytes counter successfully flushed bytes.
totalFlushRows counter successfully flushed rows.
totalFlushSucceededTimes counter number of times that the data-batch been successfully flushed.
totalFlushFailedTimes counter number of times that the flushing been failed.

Sink type mappings

Flink type StarRocks type
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL DECIMAL
BINARY INT
CHAR JSON / STRING
VARCHAR JSON / STRING
STRING JSON / STRING
DATE DATE
TIMESTAMP_WITHOUT_TIME_ZONE(N) DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) DATETIME
ARRAY<T> ARRAY<T>
MAP<KT,VT> JSON / JSON STRING
ROW<arg T...> JSON / JSON STRING

Sink tips

  1. Flush action was triggered at-least-once when: cachedRows >= ${sink.buffer-flush.max-rows} || cachedBytes >= ${sink.buffer-flush.max-bytes} || idleTime >= ${sink.buffer-flush.interval-ms}
  2. sink.buffer-flush.{max-rows|max-bytes|interval-ms} becomes invalid when it comes with the exactly-once semantic.

starrocks-connector-for-apache-flink's People

Contributors

baisui1981 avatar banmoy avatar bigdata-kuxingseng avatar chaplinthink avatar dixingxing0 avatar ggke avatar hehuiyuan avatar hffariel avatar imay avatar jameswangchen avatar lyu302 avatar shouweikun avatar waittttting avatar xlfjcg avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.