Coder Social home page Coder Social logo

cicizz / jmqtt Goto Github PK

View Code? Open in Web Editor NEW
503.0 43.0 184.0 4.97 MB

A MQTT broker,implemented by java and netty,support persistence and cluster

License: Apache License 2.0

Java 74.22% Shell 0.43% JavaScript 24.08% HTML 1.12% Dockerfile 0.16%
mqtt iot mq java

jmqtt's People

Contributors

arrogant95 avatar arrogantdyc avatar bewindoweb avatar cicizz avatar dependabot[bot] avatar gutan avatar huangdayu avatar lybxkl avatar muety avatar sfclibby avatar stone99 avatar swwheihei avatar yanceking avatar zcents avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jmqtt's Issues

官网文档

请问官网文档是不维护了吗?访问不了案例

broker for android

hi
have you ever work on android broker such as moquette or something like that ?

UnsatisfiedLinkError: librocksdbjni7773192078599615325.dll

When start a broker by IDE, it throws exception:

Exception in thread "main" java.lang.UnsatisfiedLinkError: C:\Users\admin\AppData\Local\Temp\librocksdbjni7773192078599615325.dll: 拒绝访问。
	at java.lang.ClassLoader$NativeLibrary.load(Native Method)
	at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
	at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
	at java.lang.Runtime.load0(Runtime.java:809)
	at java.lang.System.load(System.java:1086)
	at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
	at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
	at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
	at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
	at org.rocksdb.DBOptions.<clinit>(DBOptions.java:21)
	at org.jmqtt.store.rocksdb.db.RDB.<init>(RDB.java:19)
	at org.jmqtt.store.rocksdb.RDBMqttStore.<init>(RDBMqttStore.java:15)
	at org.jmqtt.broker.BrokerController.<init>(BrokerController.java:102)
	at org.jmqtt.broker.BrokerStartup.start(BrokerStartup.java:63)
	at org.jmqtt.broker.BrokerStartup.main(BrokerStartup.java:25)

Why not to abort the connection that failed to authenticate

I mean why the broker does not call the method "channel.close()" after replying to the ack message that the authentication failed.

if(!versionValid(mqttVersion)){
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
} else if(!clientIdVerfy(clientId)){
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
} else if(onBlackList(RemotingHelper.getRemoteAddr(ctx.channel()),clientId)){
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
} else if(!authentication(clientId,userName,password)){
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
} else{

MqttConnAckMessage ackMessage = MessageUtil.getConnectAckMessage(returnCode,sessionPresent);
ctx.writeAndFlush(ackMessage);
log.info("[CONNECT] -> {} connect to this mqtt server",clientId);
reConnect2SendMessage(clientId);

Unable to load the library 'netty_transport_native_epoll_x86_64', trying other loading mechanism.

系统:CentsOS 7,JDK8
描述:把jmqtt-distribution/target/jmqtt拷贝VM, 运行jmqtt-distribution/target/jmqtt/jmqttstart,出现以下异常,但是可以启动,请问怎么修改这个呢?在MAC中没有此异常。
Unable to load the library 'netty_transport_native_epoll_x86_64', trying other loading mechanism.
java.lang.UnsatisfiedLinkError: no netty_transport_native_epoll_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:369)
at java.security.AccessController.doPrivileged(Native Method)
at io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:361)
at io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:339)
at io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:219)
at io.netty.channel.epoll.Native.(Native.java:57)
at io.netty.channel.epoll.Epoll.(Epoll.java:39)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at top.hserver.core.server.util.EpollUtil.check(EpollUtil.java:10)
at top.hserver.core.server.HServer.run(HServer.java:51)
at top.hserver.HServerApplication.startServer(HServerApplication.java:156)
at top.hserver.HServerApplication.run(HServerApplication.java:75)
at org.jmqtt.manage.HttpServer.start(HttpServer.java:17)
at org.jmqtt.broker.BrokerController.start(BrokerController.java:242)
at org.jmqtt.broker.BrokerStartup.start(BrokerStartup.java:62)
at org.jmqtt.broker.BrokerStartup.main(BrokerStartup.java:22)
2020-12-29 15:06:59.475 DEBUG PID:[ 30499] 线程名: [ main] i.n.util.internal.NativeLibraryLoader [ 140] : netty_transport_native_epoll_x86_64 cannot be loaded from java.library.path, now trying export to -Dio.netty.native.workdir: /tmp
java.lang.UnsatisfiedLinkError: no netty_transport_native_epoll_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:349)
at io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:219)
at io.netty.channel.epoll.Native.(Native.java:57)
at io.netty.channel.epoll.Epoll.(Epoll.java:39)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at top.hserver.core.server.util.EpollUtil.check(EpollUtil.java:10)
at top.hserver.core.server.HServer.run(HServer.java:51)
at top.hserver.HServerApplication.startServer(HServerApplication.java:156)
at top.hserver.HServerApplication.run(HServerApplication.java:75)
at org.jmqtt.manage.HttpServer.start(HttpServer.java:17)
at org.jmqtt.broker.BrokerController.start(BrokerController.java:242)
at org.jmqtt.broker.BrokerStartup.start(BrokerStartup.java:62)
at org.jmqtt.broker.BrokerStartup.main(BrokerStartup.java:22)
Suppressed: java.lang.UnsatisfiedLinkError: no netty_transport_native_epoll_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:369)
at java.security.AccessController.doPrivileged(Native Method)
at io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:361)
at io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:339)
... 14 common frames omitted
2020-12-29 15:06:59.483 DEBUG PID:[ 30499] 线程名: [ main] i.n.util.internal.NativeLibraryLoader [ 340] : Successfully loaded the library /tmp/libnetty_transport_native_epoll_x86_645207355296361753854.so
2020-12-29 15:06:59.492 INFO PID:[ 30499] 线程名: [ main] top.hserver.core.server.HServer [ 67] : HServer 启动完成

默认消息存储中存在查找BUG

org.jmqtt.store.memory.DefaultFlowMessageStore#containSendMsg

return this.sendCache.get(clientId).contains(msgId);
->
return this.sendCache.get(clientId).containsKey(msgId);

cleansession unSubscribe old subscriptions

ConnectProcessor.java的createNewClientSession在subscriptionStore.clearSubscription时是不是也该subscriptionMatcher.unSubscribe,否则subscriptionMatcher与subscriptionStore的订阅不一致了。

Will Message 没有发送出去

请教一下, 以下是我debug过程中的分析, 请指正是否正确?
ConnectProcessor processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) 中保存will message 到 willMessageStore,但是没有地方保存在will topic 到subscriptionMatcher中。 所以在发送will message 的时候, subscriptionMatcher match 不到对应的will topic的Subscription。从而导致will message 发送不成功
Set subscriptions = subscriptionMatcher.match((String) message.getHeader(MessageHeader.TOPIC));

打包报错

指定目录执行打包命令后:

[ERROR] Failed to execute goal on project jmqtt-broker: Could not resolve dependencies for project org.jmqtt:jmqtt-broker:jar:3.0.0: Failed to collect de
pendencies at org.jmqtt:jmqtt-mqtt:jar:3.0.0: Failed to read artifact descriptor for org.jmqtt:jmqtt-mqtt:jar:3.0.0: Could not transfer artifact org.jmqt
t:jmqtt-mqtt:pom:3.0.0 from/to maven-default-http-blocker (http://0.0.0.0/): transfer failed for http://0.0.0.0/org/jmqtt/jmqtt-mqtt/3.0.0/jmqtt-mqtt-3.0
.0.pom: Connect to 0.0.0.0:80 [/0.0.0.0] failed: Connection refused: connect -> [Help 1]

3.0 最新代码dowload下来运行 broker启动报异常

2021-09-13 16:59:25,036 main ERROR Could not create plugin of type class org.apache.logging.log4j.core.appender.rolling.DefaultRolloverStrategy for element DefaultRolloverStrategy: java.nio.file.InvalidPathException: Illegal char <:> at index 5: ${ctx:log_root}/${ctx:app_name} java.nio.file.InvalidPathException: Illegal char <:> at index 5: ${ctx:log_root}/${ctx:app_name}
at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182)
at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153)
at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77)
at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94)
at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255)
at java.nio.file.Paths.get(Paths.java:84)
at org.apache.logging.log4j.core.appender.rolling.action.AbstractPathAction.getBasePath(AbstractPathAction.java:106)
at org.apache.logging.log4j.core.appender.rolling.action.AbstractPathAction.toString(AbstractPathAction.java:160)
at org.apache.logging.log4j.core.config.Node.toString(Node.java:153)
at org.apache.logging.log4j.core.config.plugins.visitors.PluginElementVisitor.visit(PluginElementVisitor.java:69)
at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.injectFields(PluginBuilder.java:185)
at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:121)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:1002)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:942)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:934)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:934)
at org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:552)
at org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:241)
at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:288)
at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:618)
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:691)
at org.apache.logging.log4j.core.LoggerContext.setConfigLocation(LoggerContext.java:676)
at org.jmqtt.broker.BrokerStartup.start(BrokerStartup.java:61)
at org.jmqtt.broker.BrokerStartup.main(BrokerStartup.java:28)

jmqtt.sql缺少表定义

jmq_product_topic
jmq_device
表结构定义没有加进来
需要根据mapper定义自己建表

想请教几个实现的问题

1、如何持久化的消息呢?
一般的MQTT Broker,像EMQ、Mosquitto,都会使用基于内存的消息队列,我之前就一直想把EMQ的分布式内存消息队列Mnesia替换为kafka,但是不知道该如何操作。在EMQ的收费版白皮书里面,有一个双向箭头,不知道是不是替换了:
image
所以这个是用RabbitMQ来做的消息队列吗,怎么去实现的呢?
2、多消费者消费同一个主题
一般Message Broker只是用来转发给后面的业务,后面的业务必然是集群,必然有多个worker。如何保证多个消费者消费同一个主题的时候,只有一个消费者得到呢?就是一条消息只消费一次。EMQ3.0的集群共享订阅就支持这个负载均衡。
3、安全验证
这里是如何进行安全验证的呢,应该在哪一步来操作呢。我开始的想法是在Message Broker前面有一个安全校验层,但是这样做的话,Websocket就过不来Message Broker了,该怎么办呢。

希望大佬能给点建议,谢谢。

jmeter

jmeter关于websocket的压测脚本可以提供下吗

runbroker.cmd throws error

Getting the following error when trying to execute runbroker.cmd command

Usage: java [-options] class [args...]
           (to execute a class)
   or  java [-options] -jar jarfile [args...]
           (to execute a jar file)
where options include:
    -d32          use a 32-bit data model if available
    -d64          use a 64-bit data model if available
    -server       to select the "server" VM
                  The default VM is server.

    -cp <class search path of directories and zip/jar files>
    -classpath <class search path of directories and zip/jar files>
                  A ; separated list of directories, JAR archives,
                  and ZIP archives to search for class files.
    -D<name>=<value>
                  set a system property
    -verbose:[class|gc|jni]
                  enable verbose output
    -version      print product version and exit
    -version:<value>
                  Warning: this feature is deprecated and will be removed
                  in a future release.
                  require the specified version to run
    -showversion  print product version and continue
    -jre-restrict-search | -no-jre-restrict-search
                  Warning: this feature is deprecated and will be removed
                  in a future release.
                  include/exclude user private JREs in the version search
    -? -help      print this help message
    -X            print help on non-standard options
    -ea[:<packagename>...|:<classname>]
    -enableassertions[:<packagename>...|:<classname>]
                  enable assertions with specified granularity
    -da[:<packagename>...|:<classname>]
    -disableassertions[:<packagename>...|:<classname>]
                  disable assertions with specified granularity
    -esa | -enablesystemassertions
                  enable system assertions
    -dsa | -disablesystemassertions
                  disable system assertions
    -agentlib:<libname>[=<options>]
                  load native agent library <libname>, e.g. -agentlib:hprof
                  see also, -agentlib:jdwp=help and -agentlib:hprof=help
    -agentpath:<pathname>[=<options>]
                  load native agent library by full pathname
    -javaagent:<jarpath>[=<options>]
                  load Java programming language agent, see java.lang.instrument
    -splash:<imagepath>
                  show splash screen with specified image
See http://www.oracle.com/technetwork/java/javase/documentation/index.html for more details.

The build was successful but unable to run the broker. Is there anything that we may be missing in execution steps?

Exception in thread "main" 正在进行过多的发布 (32202)

image

producer:

Exception in thread "main" 正在进行过多的发布 (32202)
at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:524)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:161)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:191)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1251)
at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:570)
at org.jmqtt.java.Producer.main(Producer.java:20)

broker:
2021-09-13 17:22:31,488 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:22:50,872 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:23:31,488 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:23:50,887 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:24:31,488 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ
2021-09-13 17:24:50,900 [/// - ] DEBUG remotingLog - [Remoting] -> receive mqtt code,type:12,name:PINGREQ

akka cluster 不生效

本地启动两个节点,A tcp 端口1883 , canonical.port = 25251 ,B tcp 端口2883 canonical.port = 25252 ,设备链接A 节点,通过B 节点发消息到设备订阅的主题,A节点没有收到发送的消息的事件

PublishProcessor里面的processQos1在高并发情况下有bug

在高并发情况下,其他线程发送innerMsg时会生成新的messageId,但是发送PubAck需要使用源messageId,因此该地方需要使用变量保存一下源messageId,修改如下:

    int originMessageId = innerMsg.getMsgId();
    processMessage(innerMsg);
    log.debug("[PubMessage] -> Process qos1 message,clientId={}",innerMsg.getClientId());
    MqttPubAckMessage pubAckMessage = MessageUtil.getPubAckMessage(originMessageId);

同一个Client在不同的Broker进行Connect

大佬早上好,
同一个Client在不同的Broker进行Connect时, 是不是应该支持互踢功能, 该功能是不是预备在org.jmqtt.broker.processor.ConnectProcessor.void newClientNotify(ClientSession clientSession) 做出集群各个broker通知进行维护?

cluster 如何配置

假设192.168.1.1 , 192.168.1.2 ,192.168.1.3
192.168.1.1 配置cluster
currentNodeIp="192.168.1.1"
nodeName="node_1"
groupServerPort=8880

init group nodes,ip1:port1;ip2:port2

groupNodes="192.168.1.1:8080;192.168.1.2:8080;192.168.1.3:8080"
这样配置是否有无,另外cluster有无消息备份及故障转移特性?

akka在这里起的什么作用?

看代码应该都是本地消息流转,没看出怎么用akka的集群能力?
作者想用akka集群方式是想保存pub/sub的订阅关系还是message?

启动时连接mysql5.7无反应

将连接字符串修改为 jdbc:mysql://127.0.0.1:3306/jmqtt?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8 即可正常启动

idle事件未处理

假设客户端建立了tcp连接channelActive, 但未进行connect, 看代码里没有处理idle事件, 是不是会造成client一直连着,但是又不是有效的mqtt客户端?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.