Coder Social home page Coder Social logo

daoshenzzg / socket-mqtt Goto Github PK

View Code? Open in Web Editor NEW
599.0 28.0 250.0 1.76 MB

基于Netty+MQTT的高性能推送服务框架。支持普通Socket、MQTT、MQTT web socket协议。非常方便接入上层业务实现推送业务。

License: Apache License 2.0

Java 100.00%
java netty mqtt socket websocket iot rpc

socket-mqtt's Introduction

socket-mqtt: Netty4.x + MQTT

这是一个基于Netty4.x + MQTT实现的Push推送基础框架。相比于原生Netty,
socket-mqtt:

  • 为C/S模式开发封装简单统一的编程模式
  • 简单高性能的代码
  • 统一的连接管理方案
  • 统一的线程管理方案
  • 网络基础问题的解决与支持:如心跳保持、压缩解压缩、编码与解码、加密与解密等
  • 各种网络参数、连接池实现、监听器实现等可配置可替换
  • 可实现对等集群(见负载均衡方案
  • 提供数据统计/监控组件
  • 支持普通socket、MQTT、MQTT web socket协议及自定义协议

项目结构

  • codec: 封装编码与解码
  • compression: 封装压缩与解压缩
  • count: 封装统计信息
  • database: 基于hsql的内存数据库
  • encrypt: 封装加密与解密
  • future: 封装同步和异步调用
  • listener: 封装事件监听,包括消息、通道、异常三类事件监听器
  • service: 封装C/S模型、通道、心跳管理、消息分发等核心模块

Linux内核参数配置

# 允许回收TCP连接  
net.ipv4.tcp_tw_reuse = 0  
net.ipv4.tcp_tw_recycle = 0  

# TCP 缓冲区内存  
net.ipv4.tcp_rmem = 4096 87380 8388608  
net.ipv4.tcp_wmem = 4096 87380 8388608  
net.ipv4.tcp_mem = 94500000 915000000 927000000    
# ulimits 优化  
fs.file-max = 1065353  
kernel.pid_max = 65536  
*  soft      nofile 655360  *  hard      nofile 655360  

压测报告

MQTT协议-上下行消息处理能力压测

单Broker8核16G,支持44万连接(进程启动内存5G、开启心跳);1万客户端 单消息1024B 下行tps: 16万+; 4000客户端 Publish 单消息1024B 上行tps: 17万+,千兆网卡流量基本打满。

自定义协议-单机连接数支撑能力压测

单Broker8核16G开启心跳机制,客户端每30秒发送一次心跳,支持65万连接,稳定运行10分钟后,系统负载不到2个,服务端毫无压力。由于测试资源限制,理论上可以支持百万连接。

MQTT协议-消息下行能力

1万Clients订阅的消息下行能力 对应下行负载情况

MQTT协议-消息上行能力

4000Clients订阅消息上行能力 对应上行负载情况

MQTT协议-查看连接数情况

查看连接数(telnet 10.43.204.61 8001; get status) 查看连接数(ss -l)

使用说明

各种测试类的源码在src/test/java/com/yb/socket包路径下:
包括:

  • 普通socket Server/Client
  • MQTT socket Server/Client
  • 带注册中心的普通socket/MQTT socket
  • 基于内存数据库的模拟订阅推送
  • 自定义协议 Server/Client

服务启动配置选项

Server server = new Server();  
// 设置Broker端口  
server.setPort(8000); // 设置启动信息统计。默认true  
server.setOpenCount(true);  
// 设置启用心跳功能。默认true  
server.setCheckHeartbeat(true);  
// 设置启动服务状态,默认端口8001。通过telnet server_ip 8001; get status查看服务信息  
server.setOpenStatus(true);  
// 服务状态端口。默认8001  
server.setStatusPort(8001);  
// 设置服务名称  
server.setServiceName("Demo");  
// 设置工作线程数量。默认CPU个数+1  
server.setWorkerCount(64);  
// 是否开户业务处理线程池。默认false  
server.setOpenExecutor(true);  
// 设置tcp no delay。默认true  
server.setTcpNoDelay(true);  
// 是否启用keepAlive。默认true  
server.setKeepAlive(true);  
// 自定义监听器,可处理相关事件  
server.addEventListener(new EchoMessageEventListener());  
// 设置Broker启动协议。SocketType.MQTT - MQTT协议; SocketType.NORMAL - 普通Socket协议;SocketType.MQTT_WS - MQTT web socket协议;  
server.setSocketType(SocketType.MQTT);  
// 绑定端口启动服务  
server.bind();  

MQTT web socket server DEMO

Server server = new Server();  
server.setPort(8000);  
server.addEventListener(new EchoMessageEventListener());  
server.setSocketType(SocketType.MQTT_WS);  
server.bind();  

//模拟推送  
String message = "this is a web socket message!";  
MqttRequest mqttRequest = new MqttRequest((message.getBytes()));  
while (true) {  
    if (server.getChannels().size() > 0) { 
        logger.info("模拟推送消息");     
        for (WrappedChannel channel : server.getChannels().values()) {  
            server.send(channel, "yb/notice/", mqttRequest);     
        }
    } 
    Thread.sleep(1000L);
}  

MQTT web socket client(浏览器)

可用在线mqtt测试:http://www.tongxinmao.com/txm/webmqtt.php  
Topic  Payload    Time   QoS  
yb/notice/ this is a web socket message!  2019-2-27 16:54:54 0  

Normal socket server DEMO

Server server = new Server();  
server.setPort(8000);  
server.addEventListener(new JsonEchoMessageEventListener());  
server.addChannelHandler("decoder", JsonDecoder::new);  
server.addChannelHandler("encoder", JsonEncoder::new);  
server.bind();  

//模拟推送  
JSONObject message = new JSONObject();  
message.put("action", "echo");  
message.put("message", "this is a normal socket message!");  

Request request = new Request();  
request.setSequence(0);  
request.setMessage(message);  
while (true) {  
    if (server.getChannels().size() > 0) {  
        logger.info("模拟推送消息");     
        for (WrappedChannel channel : server.getChannels().values()) {
            channel.send(request);
            Thread.sleep(5000L);
        }
    }
}  

Normal socket client DEMO

Client client = new Client();  
client.setIp("127.0.0.1");  
client.setPort(8000);  
client.setConnectTimeout(10000);  
client.addChannelHandler("decoder", JsonDecoder::new);  
client.addChannelHandler("encoder", JsonEncoder::new);  
client.connect();  

for (int i = 0; i < 2; i++) {  
    JSONObject message = new JSONObject();   
    message.put("action", "echo"); 
    message.put("message", "hello world!");  
    Request request = new Request();  
    request.setSequence(i); 
    request.setMessage(message);
    Response response = (Response) 
    client.sendWithSync(request, 3000);  
    logger.info("成功接收到同步的返回: '{}'.", response);
}  

client.shutdown();  

带注册中心 center DEMO

Server server = new Server();  
server.setPort(9000);  
server.setCheckHeartbeat(false);  
server.addChannelHandler("decoder", JsonDecoder::new);  
server.addChannelHandler("encoder", JsonEncoder::new);  
server.addEventListener(new com.yb.socket.center.CenterMockMessageEventListener());  
server.bind();  

带注册中心 server DEMO

Server server = new Server();  
server.setPort(8000);  
server.setCheckHeartbeat(false);  
server.setCenterAddr("127.0.0.1:9000,127.0.0.1:9010");  
server.addEventListener(new JsonEchoMessageEventListener());  
server.bind();  

带注册中心 client DEMO

Client client = new Client();  
client.setCheckHeartbeat(false);  
client.setCenterAddr("127.0.0.1:9000,127.0.0.1:9010");  
client.addChannelHandler("decoder", JsonDecoder::new);  
client.addChannelHandler("encoder", JsonEncoder::new);  
client.connect();  

JSONObject message = new JSONObject();  
message.put("action", "echo");  
message.put("message", "hello");  

for (int i = 0; i < 5; i++) {  
    Request request = new Request();  
    request.setSequence(i);
    request.setMessage(message);
    client.send(request);
    Thread.sleep(5000L);
}  

后续规划

  • 支持MQTT主题过滤机制
  • 支持SSL连接方式
  • 完整的QoS服务质量等级实现DEMO
  • 遗嘱消息, 保留消息及消息分发重试

压测工具

参考项目

socket-mqtt's People

Contributors

daoshenzzg avatar dependabot[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

socket-mqtt's Issues

心跳开启时,第二个tcp客户端连接不上

image
后面新建了一个类ServerIdleStateHandler并继承IdleStateHandler,添加注解@ChannelHandler.Sharable
,在com/yb/socket/service/server/Server.java中,将原来的IdleStateHandler换成ServerIdleStateHandler才解决。
问题二:注册中心这个怎么使用呢?有没有交流群或者文档之类的,是用tcp客户端连接注册注册中心吗?

这个项目是不是不完善

我用mqttfx测试了一下,除了连接,其他的发送,接收都不走
if (msg instanceof MqttMessage) {
MqttMessage message = (MqttMessage) msg;
MqttMessageType messageType = message.fixedHeader().messageType();
switch (messageType) {
case CONNECT:
this.connect(channel, (MqttConnectMessage) message);
break;
case PUBLISH:
this.publish(channel, (MqttPublishMessage) message);
break;
case SUBSCRIBE:
this.subscribe(channel, (MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
this.unSubscribe(channel, (MqttUnsubscribeMessage) message);
break;
case PINGREQ:
this.pingReq(channel, message);
break;
case DISCONNECT:
this.disConnect(channel, message);
break;
default:
if (logger.isDebugEnabled()) {
logger.debug("Nonsupport server message type of '{}'.", messageType);
}
break;
}
}
这段代码

小白请教一个问题

您好 可以加一个 您的 qq 交流么,我是 android 开发的。临时过来 写这个 很多不太懂
另外一个问题就是 我看你这个 app 不是启动项把?

普通的client连接后关闭出现内存泄露

@service("ClientServiceImpl")
public class ClientServiceImpl implements IClientService {

public String messageSend(String ip,Integer port,String message){
    MsgDecoder decoder = new MsgDecoder();
    MsgEncoder encoder = new MsgEncoder();
    Client client=new Client();
    client.setIp(ip);
    client.setPort(port);
    client.setConnectTimeout(10000);
    client.setCheckHeartbeat(false);
    client.addChannelHandler("decoder", decoder);
    client.addChannelHandler("encoder", encoder);
    client.connect();
    Request request = new Request();
    request.setSequence(1);
    request.setMessage(message);
    Response response=null;
    try {
        response = client.sendWithSync(request, 30000);
        client.shutdown();
        return response.getResult().toString();
    }catch (Exception e){
        client.shutdown();
        log.error("消息发送失败:{},指令:{}",e.getMessage(),message);
        throw new RestfulEx(RestfulStatus.H500,"消息发送失败!");
    }finally {
        //强制回收内存
        client=null;
        decoder=null;
        encoder=null;
        request=null;
        response=null;
        System.gc();
    }
}

}

日志配置文件及位置

请问下 这个项目的日志是打印在哪里呢?我想测试,但没找到打印日志和日志配置文件

解决io.netty.util.IllegalReferenceCountException: refCnt: 0问题。

因为netty4之后BytBuf加入了计数器概念
我们在创建Server的时候开启了异步执行,如下图:
image

这个时候呢,在代码执行到ServerDispatchHandler.channelRead()这个方法时,里面有这么一句代码 eventDispatcher.dispatchMessageEvent(ctx, channel, msg);
image
这句代码的实现是这样的:
image
因为是异步的情况,在我们还没有读取ByteBuf数据时就已经先执行了如下图这句代码:
image
这时候ByteBuf已经被释放了,所以我们抛出了io.netty.util.IllegalReferenceCountException: refCnt: 0异常。
解决方法:
1. 改为同步执行
image
2. 修改作者代码
image

MQTT发布消息出现异常

// Server端默认使用业务处理线程池。
this.openExecutor = true;

在这种情况下,使用MQTT协议发布消息的时候,dispatchMessageEvent 的时候会出现传值异常,无法解析内容部分,引发IllegalReferenceCountException异常

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.