cicizz / jmqtt Goto Github PK
View Code? Open in Web Editor NEWA MQTT broker,implemented by java and netty,support persistence and cluster
License: Apache License 2.0
A MQTT broker,implemented by java and netty,support persistence and cluster
License: Apache License 2.0
请问官网文档是不维护了吗?访问不了案例
目前客户端的链接鉴权和主题发布订阅鉴权功能没实现吧
hi
have you ever work on android broker such as moquette or something like that ?
When start a broker by IDE, it throws exception:
Exception in thread "main" java.lang.UnsatisfiedLinkError: C:\Users\admin\AppData\Local\Temp\librocksdbjni7773192078599615325.dll: 拒绝访问。
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
at org.rocksdb.DBOptions.<clinit>(DBOptions.java:21)
at org.jmqtt.store.rocksdb.db.RDB.<init>(RDB.java:19)
at org.jmqtt.store.rocksdb.RDBMqttStore.<init>(RDBMqttStore.java:15)
at org.jmqtt.broker.BrokerController.<init>(BrokerController.java:102)
at org.jmqtt.broker.BrokerStartup.start(BrokerStartup.java:63)
at org.jmqtt.broker.BrokerStartup.main(BrokerStartup.java:25)
I mean why the broker does not call the method "channel.close()" after replying to the ack message that the authentication failed.
系统:CentsOS 7,JDK8
描述:把jmqtt-distribution/target/jmqtt拷贝VM, 运行jmqtt-distribution/target/jmqtt/jmqttstart,出现以下异常,但是可以启动,请问怎么修改这个呢?在MAC中没有此异常。
Unable to load the library 'netty_transport_native_epoll_x86_64', trying other loading mechanism.
java.lang.UnsatisfiedLinkError: no netty_transport_native_epoll_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:369)
at java.security.AccessController.doPrivileged(Native Method)
at io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:361)
at io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:339)
at io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:219)
at io.netty.channel.epoll.Native.(Native.java:57)
at io.netty.channel.epoll.Epoll.(Epoll.java:39)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at top.hserver.core.server.util.EpollUtil.check(EpollUtil.java:10)
at top.hserver.core.server.HServer.run(HServer.java:51)
at top.hserver.HServerApplication.startServer(HServerApplication.java:156)
at top.hserver.HServerApplication.run(HServerApplication.java:75)
at org.jmqtt.manage.HttpServer.start(HttpServer.java:17)
at org.jmqtt.broker.BrokerController.start(BrokerController.java:242)
at org.jmqtt.broker.BrokerStartup.start(BrokerStartup.java:62)
at org.jmqtt.broker.BrokerStartup.main(BrokerStartup.java:22)
2020-12-29 15:06:59.475 DEBUG PID:[ 30499] 线程名: [ main] i.n.util.internal.NativeLibraryLoader [ 140] : netty_transport_native_epoll_x86_64 cannot be loaded from java.library.path, now trying export to -Dio.netty.native.workdir: /tmp
java.lang.UnsatisfiedLinkError: no netty_transport_native_epoll_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:349)
at io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:219)
at io.netty.channel.epoll.Native.(Native.java:57)
at io.netty.channel.epoll.Epoll.(Epoll.java:39)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at top.hserver.core.server.util.EpollUtil.check(EpollUtil.java:10)
at top.hserver.core.server.HServer.run(HServer.java:51)
at top.hserver.HServerApplication.startServer(HServerApplication.java:156)
at top.hserver.HServerApplication.run(HServerApplication.java:75)
at org.jmqtt.manage.HttpServer.start(HttpServer.java:17)
at org.jmqtt.broker.BrokerController.start(BrokerController.java:242)
at org.jmqtt.broker.BrokerStartup.start(BrokerStartup.java:62)
at org.jmqtt.broker.BrokerStartup.main(BrokerStartup.java:22)
Suppressed: java.lang.UnsatisfiedLinkError: no netty_transport_native_epoll_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:369)
at java.security.AccessController.doPrivileged(Native Method)
at io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:361)
at io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:339)
... 14 common frames omitted
2020-12-29 15:06:59.483 DEBUG PID:[ 30499] 线程名: [ main] i.n.util.internal.NativeLibraryLoader [ 340] : Successfully loaded the library /tmp/libnetty_transport_native_epoll_x86_645207355296361753854.so
2020-12-29 15:06:59.492 INFO PID:[ 30499] 线程名: [ main] top.hserver.core.server.HServer [ 67] : HServer 启动完成
后续有什么想做的吗。 加砖加瓦啊。
org.jmqtt.store.memory.DefaultFlowMessageStore#containSendMsg
return this.sendCache.get(clientId).contains(msgId);
->
return this.sendCache.get(clientId).containsKey(msgId);
rt.
点开了几个类,看了都没有注释。
ConnectProcessor.java的createNewClientSession在subscriptionStore.clearSubscription时是不是也该subscriptionMatcher.unSubscribe,否则subscriptionMatcher与subscriptionStore的订阅不一致了。
请教一下, 以下是我debug过程中的分析, 请指正是否正确?
ConnectProcessor processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) 中保存will message 到 willMessageStore,但是没有地方保存在will topic 到subscriptionMatcher中。 所以在发送will message 的时候, subscriptionMatcher match 不到对应的will topic的Subscription。从而导致will message 发送不成功
Set subscriptions = subscriptionMatcher.match((String) message.getHeader(MessageHeader.TOPIC));
指定目录执行打包命令后:
[ERROR] Failed to execute goal on project jmqtt-broker: Could not resolve dependencies for project org.jmqtt:jmqtt-broker:jar:3.0.0: Failed to collect de
pendencies at org.jmqtt:jmqtt-mqtt:jar:3.0.0: Failed to read artifact descriptor for org.jmqtt:jmqtt-mqtt:jar:3.0.0: Could not transfer artifact org.jmqt
t:jmqtt-mqtt:pom:3.0.0 from/to maven-default-http-blocker (http://0.0.0.0/): transfer failed for http://0.0.0.0/org/jmqtt/jmqtt-mqtt/3.0.0/jmqtt-mqtt-3.0
.0.pom: Connect to 0.0.0.0:80 [/0.0.0.0] failed: Connection refused: connect -> [Help 1]
public RedisMqttStore(ClusterConfig clusterConfig) {
this.clusterConfig = clusterConfig;
}
支持简单运维功能
支持RocketMQ Bridge
支持Kafka Bridge
支持$SYS Topic监控
这些好像都没有。拉了MASTER
2021-09-13 16:59:25,036 main ERROR Could not create plugin of type class org.apache.logging.log4j.core.appender.rolling.DefaultRolloverStrategy for element DefaultRolloverStrategy: java.nio.file.InvalidPathException: Illegal char <:> at index 5: ${ctx:log_root}/${ctx:app_name} java.nio.file.InvalidPathException: Illegal char <:> at index 5: ${ctx:log_root}/${ctx:app_name}
at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182)
at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153)
at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77)
at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94)
at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255)
at java.nio.file.Paths.get(Paths.java:84)
at org.apache.logging.log4j.core.appender.rolling.action.AbstractPathAction.getBasePath(AbstractPathAction.java:106)
at org.apache.logging.log4j.core.appender.rolling.action.AbstractPathAction.toString(AbstractPathAction.java:160)
at org.apache.logging.log4j.core.config.Node.toString(Node.java:153)
at org.apache.logging.log4j.core.config.plugins.visitors.PluginElementVisitor.visit(PluginElementVisitor.java:69)
at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.injectFields(PluginBuilder.java:185)
at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:121)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:1002)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:942)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:934)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:934)
at org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:552)
at org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:241)
at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:288)
at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:618)
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:691)
at org.apache.logging.log4j.core.LoggerContext.setConfigLocation(LoggerContext.java:676)
at org.jmqtt.broker.BrokerStartup.start(BrokerStartup.java:61)
at org.jmqtt.broker.BrokerStartup.main(BrokerStartup.java:28)
什么时候能发布Version 2.x呢?
请问是否支持MQTT V3.1.1和V5.X?建议注明支持的MQTT版本。
jmq_product_topic
jmq_device
表结构定义没有加进来
需要根据mapper定义自己建表
1、如何持久化的消息呢?
一般的MQTT Broker,像EMQ、Mosquitto,都会使用基于内存的消息队列,我之前就一直想把EMQ的分布式内存消息队列Mnesia替换为kafka,但是不知道该如何操作。在EMQ的收费版白皮书里面,有一个双向箭头,不知道是不是替换了:
所以这个是用RabbitMQ来做的消息队列吗,怎么去实现的呢?
2、多消费者消费同一个主题
一般Message Broker只是用来转发给后面的业务,后面的业务必然是集群,必然有多个worker。如何保证多个消费者消费同一个主题的时候,只有一个消费者得到呢?就是一条消息只消费一次。EMQ3.0的集群共享订阅就支持这个负载均衡。
3、安全验证
这里是如何进行安全验证的呢,应该在哪一步来操作呢。我开始的想法是在Message Broker前面有一个安全校验层,但是这样做的话,Websocket就过不来Message Broker了,该怎么办呢。
希望大佬能给点建议,谢谢。
jmeter关于websocket的压测脚本可以提供下吗
Getting the following error when trying to execute runbroker.cmd
command
Usage: java [-options] class [args...]
(to execute a class)
or java [-options] -jar jarfile [args...]
(to execute a jar file)
where options include:
-d32 use a 32-bit data model if available
-d64 use a 64-bit data model if available
-server to select the "server" VM
The default VM is server.
-cp <class search path of directories and zip/jar files>
-classpath <class search path of directories and zip/jar files>
A ; separated list of directories, JAR archives,
and ZIP archives to search for class files.
-D<name>=<value>
set a system property
-verbose:[class|gc|jni]
enable verbose output
-version print product version and exit
-version:<value>
Warning: this feature is deprecated and will be removed
in a future release.
require the specified version to run
-showversion print product version and continue
-jre-restrict-search | -no-jre-restrict-search
Warning: this feature is deprecated and will be removed
in a future release.
include/exclude user private JREs in the version search
-? -help print this help message
-X print help on non-standard options
-ea[:<packagename>...|:<classname>]
-enableassertions[:<packagename>...|:<classname>]
enable assertions with specified granularity
-da[:<packagename>...|:<classname>]
-disableassertions[:<packagename>...|:<classname>]
disable assertions with specified granularity
-esa | -enablesystemassertions
enable system assertions
-dsa | -disablesystemassertions
disable system assertions
-agentlib:<libname>[=<options>]
load native agent library <libname>, e.g. -agentlib:hprof
see also, -agentlib:jdwp=help and -agentlib:hprof=help
-agentpath:<pathname>[=<options>]
load native agent library by full pathname
-javaagent:<jarpath>[=<options>]
load Java programming language agent, see java.lang.instrument
-splash:<imagepath>
show splash screen with specified image
See http://www.oracle.com/technetwork/java/javase/documentation/index.html for more details.
The build was successful but unable to run the broker. Is there anything that we may be missing in execution steps?
producer:
Exception in thread "main" 正在进行过多的发布 (32202)
at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:524)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:161)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:191)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1251)
at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:570)
at org.jmqtt.java.Producer.main(Producer.java:20)
broker:
2021-09-13 17:22:31,488 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:22:50,872 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:23:31,488 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:23:50,887 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:24:31,488 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:24:50,900 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
本地启动两个节点,A tcp 端口1883 , canonical.port = 25251 ,B tcp 端口2883 canonical.port = 25252 ,设备链接A 节点,通过B 节点发消息到设备订阅的主题,A节点没有收到发送的消息的事件
在高并发情况下,其他线程发送innerMsg时会生成新的messageId,但是发送PubAck需要使用源messageId,因此该地方需要使用变量保存一下源messageId,修改如下:
int originMessageId = innerMsg.getMsgId();
processMessage(innerMsg);
log.debug("[PubMessage] -> Process qos1 message,clientId={}",innerMsg.getClientId());
MqttPubAckMessage pubAckMessage = MessageUtil.getPubAckMessage(originMessageId);
大佬早上好,
同一个Client在不同的Broker进行Connect时, 是不是应该支持互踢功能, 该功能是不是预备在org.jmqtt.broker.processor.ConnectProcessor.void newClientNotify(ClientSession clientSession) 做出集群各个broker通知进行维护?
假设192.168.1.1 , 192.168.1.2 ,192.168.1.3
192.168.1.1 配置cluster
currentNodeIp="192.168.1.1"
nodeName="node_1"
groupServerPort=8880
groupNodes="192.168.1.1:8080;192.168.1.2:8080;192.168.1.3:8080"
这样配置是否有无,另外cluster有无消息备份及故障转移特性?
看代码应该都是本地消息流转,没看出怎么用akka的集群能力?
作者想用akka集群方式是想保存pub/sub的订阅关系还是message?
return this.sendCache.get(clientId).contains(msgId);
should be
return this.sendCache.get(clientId).containsKey(msgId);
将连接字符串修改为 jdbc:mysql://127.0.0.1:3306/jmqtt?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8 即可正常启动
假设客户端建立了tcp连接channelActive, 但未进行connect, 看代码里没有处理idle事件, 是不是会造成client一直连着,但是又不是有效的mqtt客户端?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.