Coder Social home page Coder Social logo

xiaokh / flink-kafka-hbase Goto Github PK

View Code? Open in Web Editor NEW

This project forked from singgel/flink-kafka-hbase

0.0 1.0 0.0 32 KB

功能:实现kafka消息实时落地hbase,支持csv/json字符串两种格式的消息,支持自定义组合rowkey,列簇和列名,支持按照kafka消息流中不同字段join不同的hbase表,并自定义写入列簇和列(join时需评估一下性能), 支持at least once语义 外部依赖:apollo配置中心,本项目依靠配置驱动,配置存储在apollo配置中心

Java 100.00%

flink-kafka-hbase's Introduction

flink-kafka-hbase

功能:实现kafka消息实时落地hbase,支持csv/json字符串两种格式的消息,支持自定义组合rowkey,列簇和列名,支持按照kafka消息流中不同字段join不同的hbase表,并自定义写入列簇和列(join时需评估一下性能)
支持at least once语义
外部依赖:apollo配置中心,本项目依靠配置驱动,配置存储在apollo配置中心
配置:

{
    "indexColumnMapping": {    --indexColumnMapping即CSV格式消息的key和value按照value里的分隔符拼接后再分割后下标及写入hbase列的对应关系
        "0": "basic:time",     --第0列始终是kafka消息的key,如果不需要可以不指定
        "1": "basic:user_id",
        "2": "basic:session_id",
        "3": "basic:test_mission_id",
        "4": "basic:stratege_name",
        "5": "basic:status_type",
        "6": "basic:status_id",
        "7": "basic:position",
        "8": "basic:like_count",
        "9": "basic:retweet_count",
        "10": "basic:reply_count",
        "11": "basic:fav_count",
        "12": "basic:reward_amount",
        "13": "basic:reward_user_count",
        "14": "basic:status_hot_score",
        "15": "basic:status_hot_score_norm",
        "16": "basic:user_score",
        "17": "basic:use_score_norm",
        "18": "basic:stock_score",
        "19": "basic:stock_score_norm",
        "20": "basic:tag",
        "21": "basic:tag_score",
        "22": "basic:stock_symbol",
        "23": "basic:ip",
        "24": "basic:device",
        "25": "basic:country_name",
        "26": "basic:city_name",
        "27": "basic:topic_score",
        "28": "basic:rerank_name",
        "29": "basic:author_block_count",
        "30": "basic:percent",
        "31": "basic:random_id",
        "32": "basic:rank_score",
        "33": "basic:quote_string",
        "34": "basic:click_num",
        "35": "basic:show_num",
        "36": "basic:tag_short_term_click",
        "37": "basic:tag_short_term_show",
        "38": "basic:tag_long_term_click",
        "39": "basic:tag_long_term_show",
        "40": "basic:block_count",
        "41": "basic:context_info",
        "42": "basic:recent_behavior",
        "43": "basic:basic_string",
        "44": "basic:mention_stock_rank",
        "45": "basic:text_quality_score",
        "46": "basic:last_nc_context",
        "47": "basic:keywords"
    },
    "rowKeyDelimiter": "#",    --如果rowkey是多个列的拼接,则需指定的拼接符
    "rowKeyColumns": [         --rowkey组成的列
        "basic:user_id",
        "basic:statusId"
    ],
    "tableName": "cy_test",    --数据流要写入hbase表的表名(如不存在会自动创建)
    "kafkaConfig": {           --flink接入kafka数据源的配置
        "bootstrapServers": "singgel:9092",    --kafka的broker list
        "topic": "recommend2.statistics",         --需要接入的topic
        "groupId": "flink_recommend2_statistic_join_test2",    --flink中消费kafka topic的groupId
        "delimiter": "|",          --kafka消息value的分隔符,当valueFormat=CSV时必须指定       
        "valueFormat": "CSV",      --kafka消息value的格式,目前支持"CSV"和"JSON"两种
        "optionalProps": {}        --其他kafka消费者配置
    },
    "hbaseConfig": {      --写入的hbase集群配置
        "zookerperQuorum": "singgel-53-3.inter.singgel.com,singgel-53-4.inter.singgel.com,singgel-53-5.inter.singgel.com,singgel-53-6.inter.singgel.com,singgel-54-3.inter.singgel.com,singgel-54-4.inter.singgel.com,singgel-54-5.inter.singgel.com,singgel-54-6.inter.singgel.com",
        "port": "2181",
        "zookeeperZondeParent": "/hbase-unsecure",      --hbase在zookeeper中的根目录节点名称(注意:咱内部cdh集群是/hbase,此处是ambari集群hbase的配置)
        "batchCount": 100,           --批量写入的条数,与interval条件满足其一就触发写入,注:当接入的topic数据源生产速率较小时且无join时,可以设置为1,逐条写入
        "interval": 5000,            --批量写入的间隔时间
        "optionalProp": {}           --其他hbase设置
    },
    "jobName": "recommend_feature_hbase_sink_test",    --flink job的名称
    "parallelism": 8,               --flink任务执行时的平行度
    "jarName": "flink-kafka-hbase-1.0-SNAPSHOT-jar-with-dependencies.jar",    --执行flink任务的jar包,当通过实时平台界面提交flinkjob时需要指定
    "joinTables": [  --需要join的表,可以指定多个,可以为空;当要join多个表时,需要评估一下性能
        {
            "tableName": "user_feature",     --需要join的hbase表
            "joinKey": "basic:userId",       --join的字段,需要在"indexColumnMapping的values中,且是joinTable的rowKey
            "columnsMapping": {       --join表中的列和要写入表中的列的对应关系,key->fromFamily:fromColumn,value->toFamily:toColumn,from和to的列簇和列不需一致
                "basic:pagerank": "basic :pagerank",     
                "basic:country": "basic:country",
                "basic:province": "basic:province",
                "basic:city": "basic:city",
                "basic:mobile": "basic:mobile",
                "basic:follower_cluster": "basic:follower_cluster",
                "basic:quality_cluster": "basic:quality_cluster",
                "basic:symbol_cluster": "basic:symbol_cluster",
                "basic:topic_cluster": "basic:topic_cluster",
                "basic:stock_click7": "basic:stock_click7",
                "basic:stock_show7": "basic:stock_show7",
                "basic:stock_click30": "basic:stock_click30",
                "basic:stock_show30": "basic:stock_show30",
                "basic:symbol_page_enter": "basic:symbol_page_enter",
                "basic:symbol_new_status": "basic:symbol_new_status",
                "basic:symbol_hot": "basic:symbol_hot",
                "basic:symbol_finance": "basic:symbol_finance",
                "basic:symbol_news": "basic:symbol_news",
                "basic:symbol_notice": "basic:symbol_notice",
                "basic:symbol_general": "basic:symbol_general",
                "basic:symbol_page_view": "basic:symbol_page_view",
                "basic:symbol_page_origin": "basic:symbol_page_origin",
                "basic:attention_mark": "basic:attention_mark",
                "basic:rebalance_num": "basic:rebalance_num",
                "basic:topic_personal_short_click": "basic:topic_personal_short_click",
                "basic:topic_personal_short_show": "basic:topic_personal_short_show",
                "basic:topic_personal_long_click": "basic:topic_personal_long_click",
                "basic:topic_personal_long_show": "basic:topic_personal_long_show",
                "basic:dislike_1st": "basic:dislike_1st",
                "basic:dislike_2st": "basic:dislike_2st",
                "basic:dislike_3st": "basic:dislike_3st",
                "basic:dislike_4st": "basic:dislike_4st",
                "basic:dislike_5st": "basic:dislike_5st",
                "basic:familar_1st": "basic:familar_1st",
                "basic:familar_2st": "basic:familar_2st",
                "basic:familar_3st": "basic:familar_3st",
                "basic:familar_4st": "basic:familar_4st",
                "basic:familar_5st": "basic:familar_5st",
                "basic:like_1st": "basic:like_1st",
                "basic:like_2st": "basic:like_2st",
                "basic:like_3st": "basic:like_3st",
                "basic:like_4st": "basic:like_4st",
                "basic:like_5st": "basic:like_5st",
                "basic:unfamilar_1st": "basic:unfamilar_1st",
                "basic:unfamilar_2st": "basic:unfamilar_2st",
                "basic:unfamilar_3st": "basic:unfamilar_3st",
                "basic:unfamilar_4st": "basic:unfamilar_4st",
                "basic:unfamilar_5st": "basic:unfamilar_5st",
                "basic:headline_down_cnt": "basic:headline_down_cnt",
                "basic:headline_up_cnt": "basic:headline_up_cnt",
                "basic:optional_cnt": "basic:optional_cnt",
                "basic:dynamic_cnt": "basic:dynamic_cnt",
                "basic:quotation_cnt": "basic:quotation_cnt",
                "basic:base_rate": "basic:base_rate",
                "basic:mark_gegu_enter": "basic:mark_gegu_enter",
                "basic:mark_share_sum": "basic:mark_share_sum",
                "basic:mark_head_dislike_sum": "basic:mark_head_dislike_sum",
                "basic:mark_status_post_user_sum": "basic:mark_status_post_user_sum",
                "basic:mark_search_sum": "basic:mark_search_sum",
                "basic:mark_debate_post_user_num": "basic:mark_debate_post_user_num",
                "basic:author_click_week": "basic:author_click_week",
                "basic:author_show_week": "basic:author_show_week",
                "basic:author_click_month": "basic:author_click_month",
                "basic:author_show_month": "basic:author_show_month"
            }
        },
        {
            "tableName": "status_feature_string",
            "joinKey": "basic:statusId",
            "columnsMapping": {
                "basic:user_id": "basic:user_id",
                "basic:symbol_id": "basic:symbol_id",
                "basic:created_at": "basic:created_at",
                "basic:source": "basic:source",
                "basic:retweet_status_id": "basic:retweet_status_id",
                "basic:paid_mention_id": "basic:paid_mention_id",
                "basic:retweet_user_id": "basic:retweet_user_id",
                "basic:retweet_symbol_id": "basic:retweet_symbol_id",
                "basic:truncate": "basic:truncate",
                "basic:flags": "basic:flags",
                "basic:expired_at": "basic:expired_at",
                "basic:title_length": "basic:title_length",
                "basic:title_hash": "basic:title_hash",
                "basic:title_flag": "basic:title_flag",
                "basic:text_length": "basic:text_length",
                "basic:pic_count": "basic:pic_count",
                "basic:type": "basic:type",
                "basic:meta_classes": "basic:meta_classes",
                "basic:pic_score": "basic:pic_score",
                "basic:domain": "basic:domain",
                "basic:url_hash": "basic:url_hash",
                "basic:character_percent": "basic:character_percent",
                "basic:symbol": "basic:symbol",
                "basic:keyword": "basic:keyword",
                "basic:match_word": "basic:match_word",
                "basic:keyword_title": "basic:keyword_title",
                "basic:keyword_des": "basic:keyword_des",
                "basic:symbol_title": "basic:symbol_title",
                "basic:symbol_sim_title": "basic:symbol_sim_title",
                "basic:symbol_des": "basic:symbol_des",
                "basic:symbol_sim_des": "basic:symbol_sim_des",
                "basic:symbol_content": "basic:symbol_content",
                "basic:symbol_sim_content": "basic:symbol_sim_content"
            }
        }
    ]
}

在flink任务启动时,会去apollo配置中心取指定的配置,根据配置执行任务。

关键实现HbaseSink代码如下:

@Slf4j
public class HbaseSink extends RichSinkFunction<ObjectNode> implements CheckpointedFunction {
 
    private final JobConfig jobConfig;
 
    private HbaseUtil hbaseUtil;
 
    private long currentTime = System.currentTimeMillis();
 
    /**
     * 在flink任务自动重试时,会先恢复state中的数据;如果是cancel掉flink任务,重新手动提交,则state会清空
     */
    private transient ListState<ObjectNode> checkpointedState;
 
    private List<ObjectNode> nodes = new ArrayList<>();
 
    private static ObjectMapper MAPPER = new ObjectMapper();
 
    private StringBuilder sbLog = new StringBuilder();
 
    public HbaseSink(JobConfig jobConfig) {
        this.jobConfig = jobConfig;
    }
 
 
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.hbaseUtil = new HbaseUtil(jobConfig.getHbaseConfig());
    }
 
    /**
     * 在手动cancel和程序内部出错重试时都会触发close方法,在close方法中将nodes中的数据先flush,防止在两次写入之间,checkpoint点之前的数据丢失
     * 但会有数据重复,checkpoint点之后到发生故障时的数据会重复,如下示意图:
     * <pre>
     *                       flush
     *                        ^
     *                       /
     *            +--------------------------+
     * ------write------checkpoint-----------down----
     *                           +-----------+
     *                                 ^
     *                                /
     *                           will repeat
     * </pre>
     * <p>
     * 但对于写入具有幂等性的业务,数据重复写入不会影响结果
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        log.debug("execute sink close method");
        super.close();
        batchFlush();
        if (this.hbaseUtil.getConnection() != null) {
            try {
                this.hbaseUtil.getConnection().close();
            } catch (Exception e) {
                log.warn("connection close failed. error:{} ", e.getMessage());
            }
        }
    }
 
    /**
     * 每条记录调用一次此方法
     *
     * @param node
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(ObjectNode node, Context context) throws Exception {
        String partition = node.get("metadata").get("partition").asText();
        String offset = node.get("metadata").get("offset").asText();
        String value = node.get("value").asText();
        log.debug("partition->{}|offset->{}|value->{}", partition, offset, value);
        nodes.add(node);
        if (nodes.size() >= jobConfig.getHbaseConfig().getBatchCount() ||
                (System.currentTimeMillis() - currentTime > jobConfig.getHbaseConfig().getInterval() && nodes.size() > 0)) {
            batchFlush();
        }
    }
 
    /**
     * 将{@link HbaseSink#nodes}中的数据批量写入
     *
     * @throws IOException
     */
    private void batchFlush() throws IOException {
        long start = System.currentTimeMillis();
        List<Put> puts = convertBatch(nodes);
        if (puts.size() == 0) {
            return;
        }
        long startPut = System.currentTimeMillis();
        hbaseUtil.putBatchData(jobConfig.getTableName(), puts);
        long end = System.currentTimeMillis();
        sbLog.append(String.format(" | batch_put(%d) cost %d ms", puts.size(), end - startPut));
        sbLog.append(String.format(" | batch_total(%d) cost %d ms", puts.size(), end - start));
        sbLog.append(String.format(" | per record cost %d ms", (end - start) / puts.size()));
        log.debug(sbLog.toString());
        currentTime = System.currentTimeMillis();
        sbLog = new StringBuilder();
        nodes.clear();
    }
 
    /**
     * 批量处理
     *
     * @param objectNodes 一批数据
     * @return 返回批量的Put
     */
    private List<Put> convertBatch(List<ObjectNode> objectNodes) throws IOException {
        Map<Put, Map<String, String>> puts = new HashMap<>(objectNodes.size());
        //存储每个需要join的表中这个批次的rowkey的值
        Map<String, Set<String>> joinKeys = new HashMap<>(objectNodes.size());
 
        for (ObjectNode node : objectNodes) {
            Map<String, String> keyValues = getKeyValues(node);
 
            //获取拼接的rowKey
            List<String> rowKeyValues = new ArrayList<>();
            jobConfig.getRowKeyColumns().forEach(e -> rowKeyValues.add(keyValues.get(e)));
            if (rowKeyValues.stream().anyMatch(Objects::isNull)) {
                //如果组合rowKey的字段中有null,则过滤掉此记录
                log.warn("columns which consist of rowKey has null value");
                continue;
            }
            String rowKey = String.join(jobConfig.getRowKeyDelimiter(), rowKeyValues);
            Put put = new Put(Bytes.toBytes(rowKey));
 
            //获取这个joinTable表中这个批次所有需要join的key
            for (JoinTable joinTable : jobConfig.getJoinTables()) {
 
                joinKeys.compute(joinTable.getTableName(), (k, v) -> {
                    //keyValues.get(joinTable.getJoinKey()的值有可能为null,需做空判断
                    if (keyValues.get(joinTable.getJoinKey()) != null) {
                        if (v == null) {
                            v = new HashSet<>();
                            v.add(keyValues.get(joinTable.getJoinKey()));
                        } else {
                            v.add(keyValues.get(joinTable.getJoinKey()));
                        }
                    }
                    return v;
                });
            }
 
 
            //原始topic需要写入的列
            keyValues.forEach((k, v) -> {
                String family = k.split(":")[0];
                String column = k.split(":")[1];
                put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), v != null ? Bytes.toBytes(v) : null);
            });
 
            puts.put(put, keyValues);
        }
 
        //当需要join时执行下面操作
        for (JoinTable joinTable : jobConfig.getJoinTables()) {
            //取出这个joinTable表中这个批次所有需要join的key
            Set<String> keys = joinKeys.get(joinTable.getTableName());
            List<Get> gets = new ArrayList<>();
            //将key和result一一对应
            Map<String, Result> keyResults = new HashMap<>(keys.size());
 
            //生成需要批量get的List<Get>
            keys.forEach(e -> {
                Get get = new Get(Bytes.toBytes(e));
                joinTable.getColumnsMapping().forEach((k, v) -> {
                    get.addColumn(Bytes.toBytes(k.split(":")[0]), Bytes.toBytes(k.split(":")[1]));
                });
                gets.add(get);
            });
 
 
            long start = System.currentTimeMillis();
            //执行批量get
            Result[] results = hbaseUtil.batchGet(joinTable.getTableName(), gets);
            for (Result result : results) {
                if (result != null) {
                    keyResults.put(Bytes.toString(result.getRow()), result);
                }
            }
            long end = System.currentTimeMillis();
 
            sbLog.append(String.format("| batch_get %s(%d) %d ms", joinTable.getTableName(), keys.size(), (end - start)));
            //对之前原始写入的每个put,获取这个表需要join的rowKey的result,然后将result中的值根据joinTable的配置添加到put的对应列中
            puts.forEach((put, keyValues) -> {
                Result result = keyResults.get(keyValues.get(joinTable.getJoinKey()));
                if (result != null) {
                    joinTable.getColumnsMapping().forEach((k, v) -> {
                        byte[] columnValue = result.getValue(Bytes.toBytes(k.split(":")[0]), Bytes.toBytes(k.split(":")[1]));
                        put.addColumn(Bytes.toBytes(v.split(":")[0]), Bytes.toBytes(v.split(":")[1]), columnValue);
                    });
                }
            });
        }
 
 
        return new ArrayList<>(puts.keySet());
    }
 
    /**
     * 根据配置中给定的列值对应关系,将每条消息解析成<key,value>格式,key为配置中指定的列名(包含列簇)
     * 目前支持两种消息格式:CSV和JSON格式的字符串型数据,
     * 在处理消息时,kafka消息的key默认会被集成到value中, 对于CSV格式,kafka消息的key处在index=0的位置;对于JSON格式,kafka消息的key对应默认的kafka_key字段
     *
     * @param node flink接入的kafka消息
     * @return 返回字段名称对应的值
     */
    private Map<String, String> getKeyValues(ObjectNode node) {
        Map<String, String> indexColumns = jobConfig.getIndexColumnMapping();
        String key = node.get("key") == null ? "" : node.get("key").asText();
        String value = node.get("value") == null ? "" : node.get("value").asText();
 
 
        Map<String, String> keyValues = new HashMap<>(8);
 
        ValueFormat valueFormat = jobConfig.getKafkaConfig().getValueFormat();
        switch (valueFormat) {
            case CSV:
                //将key和value拼接起来,配置时kafka的key值作为下标的第0个
                String input = key + jobConfig.getKafkaConfig().getDelimiter() + value;
                String[] columnValues = StringUtils.splitPreserveAllTokens(input, jobConfig.getKafkaConfig().getDelimiter());
 
                //将index对应的列值写入对应的列名下,列名包含了列簇名,形如:family:qualifier
                for (Map.Entry<String, String> entry : indexColumns.entrySet()) {
                    try {
                        keyValues.put(entry.getValue(), columnValues[Integer.valueOf(entry.getKey())]);
                    } catch (Exception e) {
 
                        log.warn("index {} out of boundary.", entry.getKey(), e);
                    }
                }
 
                break;
            case JSON:
            default:
                //将kafka的key加入node,统一处理
                try {
 
                    ObjectNode jsonNode = (ObjectNode) MAPPER.readTree(value);
 
                    jsonNode.put("kafka_key", key);
 
                    //将配置中指定的列值写入对应的列名下,列名包含了列簇名,形如:family:qualifier
                    indexColumns.forEach((k, v) -> {
                        if (jsonNode.get(k) != null) {
                            keyValues.put(v, jsonNode.get(k).asText());
                        }
                    });
                } catch (IOException e) {
                    keyValues.clear();
                    indexColumns.forEach((k, v) -> keyValues.put(v, null));
                    String partition = node.get("metadata").get("partition").asText();
                    String offset = node.get("metadata").get("offset").asText();
                    String topic = node.get("metadata").get("topic").asText();
                    log.warn("this json record failed.topic->{},partition->{},offset->{},value->{}", topic, partition, offset, value);
                }
                break;
        }
        return keyValues;
    }
 
    /**
     * 执行频率和{@link FlinkRunner}中指定的checkpoint间隔一致
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (ObjectNode element : nodes) {
            checkpointedState.add(element);
        }
        log.debug("execute snapshot at {}", System.currentTimeMillis());
    }
 
    /**
     * 在程序内部出错重启时,如果调用了snapshotState方法,则会恢复checkpointedState中的数据,如果是手动cancel或重试几次失败后重新提交任务,此时
     * 的checkpointedState会是新的对象,里面没有数据
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<ObjectNode> descriptor =
                new ListStateDescriptor<>(
                        "hbase-sink-cp",
                        TypeInformation.of(new TypeHint<ObjectNode>() {
                        }));
 
        checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 
        if (context.isRestored()) {
            for (ObjectNode element : checkpointedState.get()) {
                nodes.add(element);
            }
        }
        log.info("initialState {}  record", nodes.size());
    }
}

开发过程中遇到的问题主要有两点,一是处理速度,二是批量处理时出错数据丢失的问题。

对于处理速度的优化:

(1)由最开始的单条写入改成批量写入,但在获取joinTable的列时依然是逐条获取,每个rowkey调用一次get方法,比较费时

(2)将joinTable的逐条get,改成批量get,速度提升了4-5倍,一是因为减少了提交请求的次数,加快返回速度;二是因为短时间内recommend2.statistics记录user_id和status_id分别都有重复,批量时可以减少实际查询rowkey的个数进而节省时间。

(3)尽管做了以上两点优化,但速度还是很慢,经过打日志发现,主要是user_feature这个表获取rowkey的值太慢,于是又在发送get请求时做了如下优化:

因为user_feature的所有列都插入到集成的表中,一开始就没有在get请求时指定要获取的列簇和列名,优化就是在提交get请求时,指定所有需要获取的列簇和列名,这样明显快很多,大概提升10倍,此时写入一条join后的数据大概耗时2-3ms | batch_get user_feature(139) 134 ms| batch_get status_feature_string(160) 59 ms | batch_put(200) cost 195 ms | batch_total(200) cost 433 ms | per record cost 2 ms | batch_get user_feature(134) 132 ms| batch_get status_feature_string(169) 56 ms | batch_put(200) cost 201 ms | batch_total(200) cost 434 ms | per record cost 2 ms

对于处理出错数据丢失的问题:

数据丢失的场景:(1)当HbaseSink中调用了多次invoke方法,nodes中累积了一定的数量,但还没有触发写入操作,此时flink程序由于某种原因失败了自动重启,之前nodes中累积的记录就会丢失。

怎样做到数据不丢失?

(1)让HbaseSink实现CheckpointedFunction接口,实现snapshotState和initializeState方法,snapshotState的调用频率和FlinkRunner中指定的checkpoint的频率一致,每次checkpoint会提交kafka的offset,并执行snapshotState方法,在snapshotState方法中,会将nodes中的元素加入到checkpointState中,当flink程序失败自动重启后,initializeState方法会从checkpointState中恢复nodes中的数据,接着处理。

(2)当flink任务重试几次失败导致任务最终失败或者手动停止flink任务,再重新提交flink任务时,checkpointedState会是新的对象,不会保存上次任务失败或停止时nodes中的数据,这种情况依然会丢数据,因为程序失败自动重启和手动停止时都会调用close方法,因此在close方法中调用batchFlush方法,先写入再关闭。但重新启动时,从上次checkpoint到停止时的消息会重复处理。

flink-kafka-hbase's People

Contributors

singgel avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.