Go Packages for working with the MQTT 3.1.1 protocol.
$ go get github.com/256dpi/gomqtt
Go packages for working with the MQTT protocol
License: Apache License 2.0
Go Packages for working with the MQTT 3.1.1 protocol.
$ go get github.com/256dpi/gomqtt
hi gomqtt is a wonderful lib and i want to use it in my project. but we want to send alarm message to the subscriptions directly not by mqtt client publishing. since the MemoryBackend.temporarySessions is a private member, i can not use temporarySessions to lookupSubscription. can we extend another Publish(msg *packet.Message) function to help us to publish msg directly?
goroutine 372856 [running]:
runtime.throw(0x2ae05d2, 0x1e)
runtime/panic.go:1116 +0x54 fp=0x400074b980 sp=0x400074b950 pc=0xf920b4
sync.throw(0x2ae05d2, 0x1e)
runtime/panic.go:1102 +0x30 fp=0x400074b9a0 sp=0x400074b980 pc=0xf92050
sync.(*Mutex).unlockSlow(0x40003694e0, 0x40ffffffff)
sync/mutex.go:196 +0xe4 fp=0x400074b9d0 sp=0x400074b9a0 pc=0xfcc3b4
sync.(*Mutex).Unlock(0x40003694e0)
sync/mutex.go:190 +0x64 fp=0x400074b9f0 sp=0x400074b9d0 pc=0xfcc2a4
runtime.call32(0x0, 0x2badf08, 0x400157c548)
runtime/asm_arm64.s:421 +0x74 fp=0x400074ba30 sp=0x400074b9f0 pc=0xfbf3c4
runtime.reflectcallSave(0x400074bba0, 0x2badf08, 0x400157c548, 0x8)
runtime/panic.go:881 +0x48 fp=0x400074ba60 sp=0x400074ba30 pc=0xf91868
runtime.runOpenDeferFrame(0x400073db00, 0x400157c500, 0x400074bbe0)
runtime/panic.go:855 +0x270 fp=0x400074bb20 sp=0x400074ba60 pc=0xf91740
panic(0x27b5c60, 0x3f91c90)
runtime/panic.go:969 +0x11c fp=0x400074bbe0 sp=0x400074bb20 pc=0xf91a3c
runtime.panicmem(...)
runtime/panic.go:212
runtime.sigpanic()
runtime/signal_unix.go:717 +0x3c8 fp=0x400074bc10 sp=0x400074bbe0 pc=0xfa89d8
github.com/256dpi/gomqtt/broker.(*Client).Closed(...)
github.com/256dpi/[email protected]/broker/client.go:301
github.com/256dpi/gomqtt/broker.(*MemoryBackend).Setup(0x4000369480, 0x4001648000, 0x4000024040, 0x1a, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0)
github.com/256dpi/[email protected]/broker/backend.go:198 +0xe38 fp=0x400074be10 sp=0x400074bc20 pc=0x246f828
github.com/256dpi/gomqtt/broker.(*Client).processConnect(0x4001648000, 0x400157c050, 0xf, 0x4001648000)
github.com/256dpi/[email protected]/broker/client.go:514 +0x138 fp=0x400074bec0 sp=0x400074be10 pc=0x2471d78
github.com/256dpi/gomqtt/broker.(*Client).processor(0x4001648000, 0x246a9ec, 0x4000022ec0)
github.com/256dpi/[email protected]/broker/client.go:325 +0xf4 fp=0x400074bf40 sp=0x400074bec0 pc=0x24710d4
github.com/256dpi/gomqtt/broker.(*Client).processor-fm(0x0, 0xfc18a8)
github.com/256dpi/[email protected]/broker/client.go:307 +0x28 fp=0x400074bf70 sp=0x400074bf40 pc=0x2474db8
gopkg.in/tomb%2ev2.(*Tomb).run(0x4001648098, 0x4000468050)
gopkg.in/[email protected]/tomb.go:163 +0x34 fp=0x400074bfc0 sp=0x400074bf70 pc=0x246a964
runtime.goexit()
runtime/asm_arm64.s:1148 +0x4 fp=0x400074bfc0 sp=0x400074bfc0 pc=0xfc18a4
created by gopkg.in/tomb%2ev2.(*Tomb).Go
gopkg.in/[email protected]/tomb.go:159 +0xc0
In both the client and the broker, if there is a crash during the execution of the callback: Messages might have been acknowledged that never have been processed. We should only send the acknowledgement after the callback returns.
The URL currently hides a lot of important configuration. We could unpack it and expose the fields in the Config
struct:
type Config struct {
Dialer *transport.Dialer
Scheme string
Host string
Port string
Username string
Password string
Path string
ClientID string
CleanSession bool
KeepAlive string
WillMessage *packet.Message
ValidateSubs bool
}
Hi! Thanks for good package!
I know this only packages for working with the MQTT protocol. If we wanna use on production we need some feature like persistence, clustering (hard to do) for HA application. Could you add persistence package by implement session interface on redis package instead of memory?
I have one question about the code in the path github.com\256dpi\gomqtt\topic\tree.go
.
The removeValue
function covers the value with the last value n.values[len(n.values)-1]
. Is it correct?
func (n *node) removeValue(value interface{}) {
for i, v := range n.values {
if v == value {
// remove without preserving order
n.values[i] = n.values[len(n.values)-1]
n.values = n.values[:len(n.values)-1]
break
}
}
}
The function string will ignore the first level value when i is 0. I think the function string is to print out the length of each leaf. So the fist level should not ignore.
func (n *node) string(i int) string {
str := ""
if i != 0 {
str = fmt.Sprintf("%d", len(n.values))
}
for key, node := range n.children {
str += fmt.Sprintf("\n| %s'%s' => %s", strings.Repeat(" ", i*2), key, node.string(i+1))
}
return str
}
func (s *Service) supervisor() error {
...
// try once to get a client
client, resumed := s.connect(fail)
if client == nil {
continue
}
// do we need to close client explicitly?
defer client.Close()
...
}
It should be possible to configure the service to automatically resubscribe to topics if required.
I just realized that the current memory implementation queues offline messages aside queueuing new messages. The reference implementation should support queueing the offline messages before queueing new messages to keep the order of the topic.
documentation on how to use it?
goroutine 17904 [IO wait, 1 minutes]:
internal/poll.runtime_pollWait(0x7faae0b8b978, 0x72, 0xffffffffffffffff)
/usr/local/go/src/runtime/netpoll.go:184 +0x55
internal/poll.(*pollDesc).wait(0xc006b32e98, 0x72, 0x1000, 0x1000, 0xffffffffffffffff)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).Read(0xc006b32e80, 0xc006b86000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_unix.go:169 +0x1cf
net.(*netFD).Read(0xc006b32e80, 0xc006b86000, 0x1000, 0x1000, 0x0, 0x0, 0x2)
/usr/local/go/src/net/fd_unix.go:202 +0x4f
net.(*conn).Read(0xc0066f6eb0, 0xc006b86000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/net/net.go:184 +0x68
bufio.(*Reader).fill(0xc006b19e00)
/usr/local/go/src/bufio/bufio.go:100 +0x103
bufio.(*Reader).Peek(0xc006b19e00, 0x2, 0x8, 0xc008e7dd40, 0x9211a4, 0x0, 0x0)
/usr/local/go/src/bufio/bufio.go:138 +0x4f
github.com/256dpi/gomqtt/packet.(*Decoder).Read(0xc006b74340, 0x0, 0x0, 0x0, 0x0)
/go/pkg/mod/github.com/256dpi/[email protected]/packet/stream.go:108 +0x7e
github.com/256dpi/gomqtt/transport.(*BaseConn).Receive(0xc006b47e90, 0x0, 0x0, 0x0, 0x0)
/go/pkg/mod/github.com/256dpi/[email protected]/transport/base_conn.go:69 +0xac
github.com/256dpi/gomqtt/broker.(*Client).processor(0xc006b02700, 0x45ea11, 0xc000b64760)
/go/pkg/mod/github.com/256dpi/[email protected]/broker/client.go:349 +0x223
gopkg.in/tomb%2ev2.(*Tomb).run(0xc006b027b0, 0xc006b74360)
/go/pkg/mod/gopkg.in/[email protected]/tomb.go:163 +0x2b
created by gopkg.in/tomb%2ev2.(*Tomb).Go
/go/pkg/mod/gopkg.in/[email protected]/tomb.go:159 +0xc7
I liked the idea of being able to cancel acceptance of a publish message however the current implementation doesn't match the documentation. The patch on #23 will enable that behaviour. If necessary it would be easy to add an option to switch between existing behaviour announcing packet after the ack/rel/comp sequence and this which always announces on publish.
In addition the second patch announces the client online before any persistent session publish messages can be observed.
packet/stream.go
Encoder & Decoder reuse .buffer
will case race condition when Concurrent read and write pkt
gomqtt is a very good libs and used in our projects, thank all gomqtt contributors. But because some logics of our projects are different from gomqtt's, we cannot use some gomqtt's public structs directly. For example, we need a mqtt client with auto-reconnect feature and without any message cache in memory, so we must implement it by myself and in order to avoid to copy code we hope gomqtt/client/tracker can be marked as a public struct.
Some calls to the backend should maybe be synchronized. Currently, clients can call Setup()
and Terminate()
at the same time and caused issues due to not beeing synchronized. Maybe we just need synchronization for the begin and end of a connection.
Use standard log interface to support levels and fields, e.g. logrus.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.