Coder Social home page Coder Social logo

xuxueli / xxl-mq Goto Github PK

View Code? Open in Web Editor NEW
433.0 46.0 229.0 9.25 MB

A lightweight distributed message queue framework.(分布式消息队列XXL-MQ)

Home Page: http://www.xuxueli.com/xxl-mq/

License: GNU General Public License v3.0

Java 99.94% Dockerfile 0.06%
java message-queue broker queue broadcast distributed

xxl-mq's Introduction

XXL-MQ

XXL-MQ, A lightweight distributed message queue framework.
-- Home Page --

Introduction

XXL-MQ is a lightweight distributed message queue framework. With features of "horizontal expansion, high availability, massive data accumulation, single TPS over 100,000, millisecond delivery". Support for "concurrent message, serial message, broadcast message, delay message, transaction message, failure retry, timeout control" and other message features. Now, it's already open source, real "out-of-the-box".

XXL-MQ是一款轻量级分布式消息队列,拥有 "水平扩展、高可用、海量数据堆积、单机TPS过10万、毫秒级投递" 等特性, 支持 "并发消息、串行消息、广播消息、延迟消息、事务消费、失败重试、超时控制" 等消息特性。现已开放源代码,开箱即用。

Documentation

Features

  • 1、简单易用: 一行代码即可发布一条消息; 一行注解即可订阅一个消息主题;
  • 2、轻量级: 部署简单,不依赖第三方服务,一分钟上手;
  • 3、水平扩展:消息中心支持无限水平扩展,这里的水平扩展包括两方面:消息生产能力、消息消费能力;通过集群扩展线性提升消息吞吐能力;
  • 4、高可用:消息中心能够忍受部分示例失效,不影响整个集群的可用性。通过内置注册中心可以实现秒级摘除失效节点,消息服务动态转移;
  • 5、消息持久化:全部消息持久化存储,消息中心支持通过配置选择是否清理过期消息。
  • 6、强数据安全:消息数据存储在DB中,可事务保障数据安全,防止消息数据丢失;
  • 7、海量数据堆积:消息数据存储在DB中,原生兼容支持 "MySQL、TIDB" 两种存储方式,前者支持千万级消息堆积,后者支持百亿级别消息堆积(TIDB理论上无上限);
  • 8、单机TPS过10W:单机TPS受限于DB存储方式,选型 "MySQL" 时单机TPS过万,选型 "TIDB" 时单机TPS过10万;
  • 9、毫秒级投递延迟:消息中心与客户端通过RPC的方式进行消息通讯,毫秒级延时;
  • 10、多种消息模式:
    • 并行消息:消息平均分配在该主题在线消费者,分片方式并行消费;适用于吞吐量较大的消息场景,如邮件发送、短信发送等业务逻辑
    • 串行消息:消息固定分配给该主题在线消费者中其中一个,FIFO方式串行消费;适用于严格限制并发的消息场景,如秒杀、抢单等排队业务逻辑;
    • 广播消息:消息将会广播发送给该主题在线消费者分组,全部分组都会消费该消息,但是一个分组下只会消费一次;适用于广播场景,如广播更新缓存等
  • 11、延时消息: 支持设置消息的延迟生效时间, 到达设置的生效时间时该消息才会被消费;适用于延时消费场景,如订单超时取消等;
  • 12、事务消费: 消费者开启事务开关后,消息事务性保证只会成功执行一次;
  • 13、失败重试: 支持设置消息的重试次数, 在消息执行失败后将会按照设置的值进行消息重试执行,直至重试次数耗尽或者执行成功;
  • 14、超时控制: 支持自定义消息超时时间,消息消费超时将会主动中断;
  • 15、消息可见: 系统中每一条消息可通过Web界面在线查看,甚至支持编辑消息内容和消息状态;
  • 16、消息可追踪: 支持追踪每一条消息的执行路径, 便于排查业务问题;
  • 17、消息失败告警:支持以Topic粒度监控消息,存在失败消息时主动推送告警邮件;默认提供邮件方式失败告警,同时预留扩展接口,可方面的扩展短信、钉钉等告警方式;
  • 18、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用;
  • 19、访问令牌(accessToken):为提升系统安全性,消息中心和客户端进行安全性校验,双方AccessToken匹配才允许通讯;

Development

于2015年中,我在github上创建XXL-MQ项目仓库并提交第一个commit,随之进行系统结构设计,UI选型,交互设计……

至今,XXL-MQ已接入多家公司的线上产品线,截止2016-09-18为止,XXL-MQ已接入的公司包括不限于:

- 1、农信互联
- ……

更多接入的公司,欢迎在 登记地址 登记,登记仅仅为了产品推广。

欢迎大家的关注和使用,XXL-MQ也将拥抱变化,持续发展。

Communication

Contributing

Contributions are welcome! Open a pull request to fix a bug, or open an Issue to discuss a new feature or change.

欢迎参与项目贡献!比如提交PR修复一个bug,或者新建 Issue 讨论新特性或者变更。

Copyright and License

This product is open source and free, and will continue to provide free community technical support. Individual or enterprise users are free to access and use.

  • Licensed under the GNU General Public License (GPL) v3.
  • Copyright (c) 2015-present, xuxueli.

产品开源免费,并且将持续提供免费的社区技术支持。个人或企业内部可自由的接入和使用。

Donate

No matter how much the amount is enough to express your thought, thank you very much :) To donate

无论金额多少都足够表达您这份心意,非常感谢 :) 前往捐赠

xxl-mq's People

Contributors

xuxueli avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

xxl-mq's Issues

建议手动设定线程池

看了代码里面留有大量的Executors,还是建议手动设定方式。毕竟中间件还是要求更好的质量。

Topic下的待处理消息数据在1W条左右,xxl-rpc会有invoke error

2022-05-28 11:11:33,694 sak-mq-comsumer: INFO -- rpc.remoting.invoker.reference.XxlRpcReferenceBean -- >>>>>>>>>>> xxl-rpc, invoke error, address:192.168.0.150:7080, XxlRpcRequestXxlRpcRequest{requestId='348c4076-2f68-46e0-acf1-1cd760cdb846', createMillisTime=1653707483694, accessToken='null', className='com.xxl.mq.client.broker.IXxlMqBroker', methodName='pullNewMessage', parameterTypes=[class java.lang.String, class java.lang.String, int, int, int], parameters=[site_group_push_product_batch, DEFAULT, 1, 3, 100], version='null'}

目前的xxl-rpc-core: 1.3.0

尝试升级到1.5.0,XxlRpcReferenceBean的构造方法已被移除 netType参数也被移除了,

XxlMqConsumer的代码似乎有问题

我在阅读XxlMqConsumer类的代码时,发现如果注入一个TopicComsumer,XxlMqConsumer类在初始化时会watchTopic,但是在initTopicConsumer方法的136行中,似乎执行了ZK注册consumer的逻辑,而且用的是queueConsumerRespository,这里似乎不需要再注册consumer了吧,因为之前已经watchTopic了,即使需要注册,也应该使用topicConsumerRespository吧?
PS:现在代码的逻辑中,key使用的是consumer的topic名称,所以如果在一台机器上,启动两个监听同一Topic的TopicConsumer是不是会有问题?

PR请求Review

加了消息过期机制,发了一个PR,帮忙review下吧

在XxlProducer中增加新方法后无法找到该方法

你好,我在com.xxl.mq.client.XxlMqProducer类中增添了新的方法produceCachedMsg,然后在example工程中增加一个controller action调用该方法,但是会报以下错误:
for servlet [springmvc] in context with path [/xxl-mq-example] threw exception [Handler processing failed; nested exception is java.lang.NoSuchMethodError: com.xxl.mq.client.XxlMqProducer.produceCachedMsg(Ljava/lang/String;Ljava/util/Map;)V] with root cause
java.lang.NoSuchMethodError: com.xxl.mq.client.XxlMqProducer.produceCachedMsg(Ljava/lang/String;Ljava/util/Map;)V

然而奇怪的是,如果我在一个tomcat上同时启动broker和example两个项目,那么就可以成功的调用该方法。如果在server1中部署了broker,在server2上部署example,就会报上述错误

consumer group 和provider group设计在一起了,是有问题的

  1. provider端发送消息时,需要知道consumer的group,否则消息查询不到,group设计耦合了
  2. 测试用例中可以用,是因为都是DEFAULT
        List<XxlMqMessage> list = xxlMqMessageDao.pullNewMessage(XxlMqMessageStatus.NEW.name(), topic, group, consumerRank, consumerTotal, pagesize);

对应sql

SELECT <include refid="Base_Column_List" />
		FROM xxl_mq_message AS t
		WHERE 	t.topic = #{topic}
			AND t.group = #{group}
			AND t.status = #{newStatus}
			AND t.effectTime <![CDATA[ < ]]> NOW()
			<if test="consumerTotal > 1">
				AND (
					(
						t.shardingId <![CDATA[ = ]]> 0
						AND
						MOD(t.id, #{consumerTotal}) = #{consumerRank}
					)
					OR
					(
						t.shardingId <![CDATA[ > ]]> 0
						AND
						MOD(t.shardingId, #{consumerTotal}) = #{consumerRank}
					)
				)
			</if>
		ORDER BY t.id ASC
		LIMIT #{pagesize}

配置目录问题

/**
     * zk config file
     */
    private static final String ZK_ADDRESS_FILE = "/data/webapps/xxl-conf.properties";

配置地址可否开放出来,可以自定义

为什么基于zk来进行广播呢?

基于zk广播的确简单也容易理解,它的qps大吗? 性能应该不太好吧 ,为什么不在有多个client,让那个server发送client呢 接受到数据处理呢????

关于部署的一个小白问题

在server端部署时,是不是除了部署xxl-mq-broker这个子项目,还应该部署xxl-mq-client子项目下的broker类,因为只有这样XxlMqBroker才能通过Spring的依赖注入获得xxlMqMessageDao,并且执行init方法?

初次接触分布式CS架构,所以有些细节不是很熟悉,见笑了

XxlMqClient 中代码的一个疑问,似乎同步没有起作用

您好,我在安装并阅读代码时,在xx-mq-client包的XxlMqClient 类中发现getXxlMqService的方法中使用CountDownlatch时,countdown方法在await方法之前,这似乎违背了countDownLatch的使用方法,同步会失效。可以劳烦您解释一下吗?

单机部署可以运行,多网卡多机部署失败

多机多网卡部署,Brocker提示注册成功
“>>>>>>>>>>> xxl-rpc registe service item, ...........”
但Service端提示:
“>>>>>>>>>>> xxl-rpc,no address from service:........”
跟作者沟通过,怀疑时zk没注册成功。但把它换到同一个环境下也不行,应该是多网卡问题。希望得到解决。

broker部署报错

org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /xxl-mq/rpc
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
at com.xxl.mq.client.rpc.util.ZkServiceRegistry.registerServices(ZkServiceRegistry.java:80)
at com.xxl.mq.client.rpc.netcom.NetComServerFactory$1.run(NetComServerFactory.java:50)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.