Coder Social home page Coder Social logo

rocketmqdeduplistener's Introduction

RocketMQDedupListener

通用的RocketMQ消息幂等去重消费者工具类,开箱即用

  1. 支持利用Redis或者MySQL做幂等表。
  2. 支持业务主键去重或消息ID去重(默认)
  3. 支持消息重复并发控制(重复的消息消费成功/失败前,不会同时消费第二条)
  4. 接近于EXACTLY-ONCE语义(消息只会且仅会被成功消费一次),极端场景下则为ATLEAST-ONCE语义(消息至少被成功消费一次,不会因为去重的增强而丢失消息)

内置去重原理

见以下流程图

image

去重实现的来龙去脉

可以参考本人在官方微信发表的博文: RocketMQ消息幂等的通用解决方案

Quick Start

1、继承DedupConcurrentListener类,实现消费回调和去重键的设置回调

public class SampleListener extends DedupConcurrentListener {

    public SampleListener(DedupConfig dedupConfig) {
        super(dedupConfig);
    }

    //基于什么做消息去重,每一类不同的消息都可以不一样,做去重之前会尊重此方法返回的值
    @Override
    protected String dedupMessageKey(MessageExt messageExt) {
        //为了简单示意,这里直接使用消息体作为去重键,正式使用时候不建议这样使用
        if ("TEST-TOPIC".equals(messageExt.getTopic())) {
            return new String(messageExt.getBody());
        } else {//其他使用默认的配置(消息id)
            return super.dedupMessageKey(messageExt);
        }
    }

    @Override
    protected boolean doHandleMsg(MessageExt messageExt) {
        switch (messageExt.getTopic()) {
            case "TEST-TOPIC":
                log.info("假装消费很久....{} {}", new String(messageExt.getBody()), messageExt);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {}
                break;
        }
        return true;
    }
}

2、使用此实例启动RocketMQ 消费者


            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
            consumer.subscribe("TEST-TOPIC", "*");

            //START:区别于普通RocketMQ使用的代码
            String appName = consumer.getConsumerGroup();//针对什么应用做去重,相同的消息在不同应用的去重是隔离处理的
            StringRedisTemplate stringRedisTemplate = null;// 这里省略获取StringRedisTemplate的过程,具体的消息幂等表会保存到Redis中
            DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
            DedupConcurrentListener messageListener = new SampleListener(dedupConfig);
            //END:区别于普通RocketMQ使用的代码


            consumer.registerMessageListener(messageListener);
            consumer.start();
        

注:

  1. 以上省略了RocketMQ消费者的配置及StringRedisTemplate的获取过程,需要使用者自己准备。
  2. RocketMQDedupListener支持使用Redis或MySQL进行去重,更多使用详情请见SampleListener.java

使用场景测试

以下是部分实验的输出日志及现象,读者可以参考观察实验(基于SampleListener.java的实现)

一、测试普通的消息重复:

  1. 模拟正常消息:发送消息到TEST-TOPIC, 报文为,test-ljj-msg1234
  2. 模拟重复消息:隔几秒后(这个例子需要大于3秒),再发送消息到TEST-TOPIC,报文一样是test-ljj-msg1234

日志输出如下:

[INFO] 2020-05-15 11:06:17,697 []  >>> 假装消费很久....test-ljj-msg1234 MessageExt [queueId=1, storeSize=169, queueOffset=0, sysFlag=0, bornTimestamp=1589511454575, bornHost=/10.13.32.179:52637, storeTimestamp=1589511454576, storeHost=/10.13.32.179:10911, msgId=0A0D20B300002A9F000000003EEA31B0, commitLogOffset=1055535536, bodyCRC=1038040938, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TEST-TOPIC, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1589511977328, UNIQ_KEY=0A0D20B3632A5B2133B14A730F6F014A, WAIT=true}, body=16]]
[INFO] 2020-05-15 11:06:20,748 []  >>> consume [1] msg(s) all successfully
[WARN] 2020-05-15 11:06:26,504 []  >>> message has been consumed before! dedupKey = DedupElement={"application":"repay-platform-core","msgUniqKey":"test-ljj-msg1234","tag":"","topic":"TEST-TOPIC"}, msgId : 0A0D20B3632A5B2133B14A7332DB014B , so just ack. RedisPersist
[INFO] 2020-05-15 11:06:26,504 []  >>> consume [1] msg(s) all successfully

说明:

可以看到第二条消息被直接幂等掉了,没有进入业务的测试代码

测试并发重复消费:

  1. 模拟正常消息:发送消息到TEST-TOPIC, 报文为 test-ljj-msg123
  2. 模拟重复消息:马上(这个例子需要小于3秒)再发送消息到TEST-TOPIC,报文一样是 test-ljj-msg123

由于这里需要一些特殊说明,以下日志增加了注释

33秒第一条消息到达,这里消息会消费3秒
[INFO] 2020-05-15 11:07:33,756 []  >>> 假装消费很久....test-ljj-msg123 MessageExt [queueId=1, storeSize=168, queueOffset=2, sysFlag=0, bornTimestamp=1589511530879, bornHost=/10.13.32.179:52651, storeTimestamp=1589511530881, storeHost=/10.13.32.179:10911, msgId=0A0D20B300002A9F000000003EEA3302, commitLogOffset=1055535874, bodyCRC=146853239, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TEST-TOPIC, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1589512053623, UNIQ_KEY=0A0D20B3632A5B2133B14A74397F014C, WAIT=true}, body=15]]

35秒重复消息到达,发现前面的消息还在消费
[WARN] 2020-05-15 11:07:35,884 []  >>> the same message is considered consuming, try consume later dedupKey : DedupElement={"application":"repay-platform-core","msgUniqKey":"test-ljj-msg123","tag":"","topic":"TEST-TOPIC"}, 0A0D20B3632A5B2133B14A7441FB014D, RedisPersit
消费按消费失败处理,触发延迟消费
[WARN] 2020-05-15 11:07:35,884 []  >>> consume [1] msg(s) fails, ackIndex = [-1]

36秒第一条消息消费成功
[INFO] 2020-05-15 11:07:36,801 []  >>> consume [1] msg(s) all successfully

46秒第二条消息延迟消费开始,发现这条消息已经被成功消费
[WARN] 2020-05-15 11:07:46,024 []  >>> message has been consumed before! dedupKey = DedupElement={"application":"repay-platform-core","msgUniqKey":"test-ljj-msg123","tag":"","topic":"TEST-TOPIC"}, msgId : 0A0D20B3632A5B2133B14A7441FB014D , so just ack. RedisPersit

直接按照消费成功处理
[INFO] 2020-05-15 11:07:46,024 []  >>> consume [1] msg(s) all successfully

说明:

可以看到第二条消息在第一条消息消费的过程中就投递到消费者了,这时候去重逻辑做了并发控制,保证了业务代码的安全。


MYSQL去重支持

若希望使用MYSQL存储消息消费记录,使用上仅需把StringRedisTemplate改成JdbcTemplate:

        JdbcTemplate jdbcTemplate = null;// 这里省略获取JDBCTemplate的过程,幂等表将使用MySQL的t_rocketmq_dedup存储
        DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, jdbcTemplate);

同时需要预先建立一张消息去重表,结构如下:

-- ----------------------------
-- Table structure for t_rocketmq_dedup
-- ----------------------------
DROP TABLE IF EXISTS `t_rocketmq_dedup`;
CREATE TABLE `t_rocketmq_dedup` (
`application_name` varchar(255) NOT NULL COMMENT '消费的应用名(可以用消费者组名称)',
`topic` varchar(255) NOT NULL COMMENT '消息来源的topic(不同topic消息不会认为重复)',
`tag` varchar(16) NOT NULL COMMENT '消息的tag(同一个topic不同的tag,就算去重键一样也不会认为重复),没有tag则存""字符串',
`msg_uniq_key` varchar(255) NOT NULL COMMENT '消息的唯一键(建议使用业务主键)',
`status` varchar(16) NOT NULL COMMENT '这条消息的消费状态',
`expire_time` bigint(20) NOT NULL COMMENT '这个去重记录的过期时间(时间戳)',
UNIQUE KEY `uniq_key` (`application_name`,`topic`,`tag`,`msg_uniq_key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;


说明:因为需要支持不同的应用,所以需要存储application_name,因为同一个业务主键可能来自不同的topic/tag,所以也需要存储起来。

一直消费失败会否引起死循环

不会。失败/消费中触发的延迟消费依赖与RocketMQ原生的重试机制,默认是16次。如果有希望调整延迟的时间和重试次数,需要自行调整Consumer配置。

关于作者

Apache RocketMQ Committer,知乎专栏 RocketMQ详解作者,RoceketMQ官微投稿者

rocketmqdeduplistener's People

Contributors

jaskey avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rocketmqdeduplistener's Issues

消息成功消费后的问题

大佬,你好。我看在DedupConcurrentListener类的consumeMessage方法中若消息全部成功消费只是简单了打印下了log。如果像流程图那样更新消息表状态为成功,那样可能在更新消息表的过程中宕机了,那么还是会出现消息重复的问题吧。请问一般如何解决?

关于业务逻辑与去重表更新操作不在一个原子操作中的问题探讨

大佬,你好,非常感谢你提供的消息幂等处理 Demo!
在阅读你的实现中,我看了下com.github.jaskey.rocketmq.strategy.DedupConsumeStrategy#doHandleMsgAndUpdateStatus 方法,发现业务逻辑和去重表更新操作不在一个原子操作中,如果出现业务逻辑处理成功后没有更新去重表,怎么办?
还是说这个发生的机率很小,是业务允许的范围之内?
就像 RocketMQ 无法保证严格的顺序消息,但普通的顺序消息一般业务能够接收一样?

mysql作为存储数据没有清理

如果使用msyql作为存储方案,缺少删除过期数据的地方啊,这样还需要增加个定时任务,删除过期数据?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.