Comments (21)
@mrhb6006 @thedevop @mochi-co According to MQTT spec 3.1.2.4 Clean Session , if the client does not expire, we should retain the client's subscriptions upon reconnection. However, I tested Mosquitto, and it does not support this feature. Therefore, I think making this an optional capability is a good idea.
from server.
@werbenhu , we current do retain client's subscription
Lines 495 to 501 in 5058333
We then clean up the old client:
Line 505 in 5058333
But will not unsubscribe topics if the client is taken over.
Lines 1265 to 1267 in 5058333
I have tested with Paho client, and it does retain the subscriptions. However, the messages are only stored if QOS > 0.
from server.
i use this pkg at production, plz help
from server.
@mrhb6006 I'm too busy at the moment. I will take a look at this issue as soon as I have some free time.
from server.
@mrhb6006 What is the version of the library in your app?
from server.
@mrhb6006 , in order for the message to be saved, the following conditions need to be met:
- Client session is not expired (met based on above)
- Client subscribed to the topic with QOS > 0
- Message published with QOS > 0
Can you check what QOS was used for subscription and publishing?
from server.
@thedevop
all conditions are met
when publish with real client it is ok
but when i use inline client it is not
from server.
@mrhb6006 What is the version of the library in your app?
last version
from server.
some
tnx
I'm waiting for you
from server.
Can you elaborate your steps as an inline client would not experience disconnect.
from server.
@mrhb6006 Take a look at #354 to see if it resolves your issue.
from server.
@thedevop The testing environment I'm in right now is indeed, as you said, without any issues. I will conduct further tests when I return to the previous environment in a few days.
from server.
@thedevop you are right. I tested it today, and I couldn't reproduce the issue. It might have been my mistake earlier.
from server.
@mrhb6006 , can you re-test if a client retains subscription when it disconnect/reconnect?
Can you also re-test the saved messages, make sure the following conditions are met:
- Client session is not expired (met based on above)
- Client subscribed to the topic with QOS > 0
- Message published with QOS > 0
from server.
@mrhb6006 , can you re-test if a client retains subscription when it disconnect/reconnect?
Can you also re-test the saved messages, make sure the following conditions are met:
- Client session is not expired (met based on above)
- Client subscribed to the topic with QOS > 0
- Message published with QOS > 0
yes all conditions are met.
i use inline client for publish and message not save at session
from server.
@thedevop you are right. I tested it today, and I couldn't reproduce the issue. It might have been my mistake earlier.
test with inline client publish ?
from server.
@mrhb6006 I will test this scenario later.
from server.
@mrhb6006 I tested it, and it's OK. Client A subscribes to the 'mochi' topic with QoS 1, then disconnects. Client B connects, publishes a message to the 'trigger' topic, triggering an inline publish to send a message. When Client A reconnects, it can receive the content sent by Client B. You can refer to my 'main.go' below for details. The client I used is mqttx.
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
package main
import (
"bytes"
"flag"
"log"
"os"
"os/signal"
"syscall"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
)
// Options contains configuration settings for the hook.
type MyHookOptions struct {
Server *mqtt.Server
}
// AllowHook is an authentication hook which allows connection access
// for all users and read and write access to all topics.
type MyHook struct {
mqtt.HookBase
config *MyHookOptions
}
func (h *MyHook) Init(config any) error {
h.Log.Info("initialised")
if _, ok := config.(*MyHookOptions); !ok && config != nil {
return mqtt.ErrInvalidConfigType
}
h.config = config.(*MyHookOptions)
if h.config.Server == nil {
return mqtt.ErrInvalidConfigType
}
return nil
}
// ID returns the ID of the hook.
func (h *MyHook) ID() string {
return "allow-all-auth"
}
// Provides indicates which hook methods this hook provides.
func (h *MyHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnectAuthenticate,
mqtt.OnACLCheck,
mqtt.OnPublish,
}, []byte{b})
}
// OnConnectAuthenticate returns true/allowed for all requests.
func (h *MyHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
return true
}
// OnACLCheck returns true/allowed for all checks.
func (h *MyHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
return true
}
func (h *MyHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload))
pkx := pk
if string(pk.TopicName) == "trigger" {
h.config.Server.Publish("mochi", pk.Payload, false, 1);
}
return pkx, nil
}
func main() {
tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
flag.Parse()
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
server := mqtt.New(&mqtt.Options{
InlineClient: true,
})
_ = server.AddHook(new(MyHook), &MyHookOptions{
Server: server,
})
tcp := listeners.NewTCP("t1", *tcpAddr, nil)
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}
go func() {
err := server.Serve()
if err != nil {
log.Fatal(err)
}
}()
<-done
server.Log.Warn("caught signal, stopping...")
_ = server.Close()
server.Log.Info("main.go finished")
}
from server.
@mrhb6006 I tested it, and it's OK. Client A subscribes to the 'mochi' topic with QoS 1, then disconnects. Client B connects, publishes a message to the 'trigger' topic, triggering an inline publish to send a message. When Client A reconnects, it can receive the content sent by Client B. You can refer to my 'main.go' below for details. The client I used is mqttx.
// SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co // SPDX-FileContributor: mochi-co package main import ( "bytes" "flag" "log" "os" "os/signal" "syscall" mqtt "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/listeners" "github.com/mochi-mqtt/server/v2/packets" ) // Options contains configuration settings for the hook. type MyHookOptions struct { Server *mqtt.Server } // AllowHook is an authentication hook which allows connection access // for all users and read and write access to all topics. type MyHook struct { mqtt.HookBase config *MyHookOptions } func (h *MyHook) Init(config any) error { h.Log.Info("initialised") if _, ok := config.(*MyHookOptions); !ok && config != nil { return mqtt.ErrInvalidConfigType } h.config = config.(*MyHookOptions) if h.config.Server == nil { return mqtt.ErrInvalidConfigType } return nil } // ID returns the ID of the hook. func (h *MyHook) ID() string { return "allow-all-auth" } // Provides indicates which hook methods this hook provides. func (h *MyHook) Provides(b byte) bool { return bytes.Contains([]byte{ mqtt.OnConnectAuthenticate, mqtt.OnACLCheck, mqtt.OnPublish, }, []byte{b}) } // OnConnectAuthenticate returns true/allowed for all requests. func (h *MyHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool { return true } // OnACLCheck returns true/allowed for all checks. func (h *MyHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool { return true } func (h *MyHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) { h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload)) pkx := pk if string(pk.TopicName) == "trigger" { h.config.Server.Publish("mochi", pk.Payload, false, 1); } return pkx, nil } func main() { tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener") flag.Parse() sigs := make(chan os.Signal, 1) done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigs done <- true }() server := mqtt.New(&mqtt.Options{ InlineClient: true, }) _ = server.AddHook(new(MyHook), &MyHookOptions{ Server: server, }) tcp := listeners.NewTCP("t1", *tcpAddr, nil) err := server.AddListener(tcp) if err != nil { log.Fatal(err) } go func() { err := server.Serve() if err != nil { log.Fatal(err) } }() <-done server.Log.Warn("caught signal, stopping...") _ = server.Close() server.Log.Info("main.go finished") }
i mean use server.Publish("trigger",body,2) for publish
dont work for me
from server.
@mrhb6006 I have also tested the situation of subscribing and publishing to the QoS2 topic, and it works fine. You can modify my main.go to conduct the test.
from server.
Can this be closed?
from server.
Related Issues (20)
- After enabled badger, the vlog file up to 700M one day and 4GB one week HOT 9
- Race condition when running the redis example HOT 4
- 遍历Clients时如何判断当前Client是否为Disconected状态 HOT 3
- 作者您好,请帮忙关注一下这个问题 HOT 3
- Hi, what is the simplest way to make messages can be restored when server cut off? HOT 5
- [badgerdb] vlog growing unbounded - consider adding GC and exposing options HOT 6
- The badge still getting vlog file keep growing infirnity HOT 7
- How to send topics posted by specific users only to specific subscribed users? HOT 11
- Does peddle perssistant released? HOT 5
- MQTTX cannot use Topic Alias. MQTT5.0 主题别名发送卡住,无法发布主题别名的消息 HOT 2
- How to use the new pessistent hook? HOT 1
- Reload auth fIle on the run HOT 2
- InlineClient模式下服务端订阅问题,inline subscribers do not receive messages HOT 5
- Merge 2 version of storm HOT 4
- Add Support for Disconnect With Will Message Reason Code
- Logging Level is not Configurable Via File Configuration
- Persistence storage did not work with SetCleanSession(false) HOT 3
- Don't allow inheriting session unless username matches HOT 5
- MessageExpiry Hook HOT 1
- OnConnectAuthenticate cannot specify an error code (like Client Identifier not valid)
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from server.