Coder Social home page Coder Social logo

vibe-mqtt's Introduction

Actions Status Dub downloads License Latest version

vibe-mqtt

MQTT broker client library written completely in D.

API documentation

MQTT protocol version supported: 3.1.1

Depends on: vibe.d

Tested on:

Supported MQTT 3.1.1 features:

  • QoS0, QoS1 and QoS2 messages handling
  • Authentication
  • Session state storage (currently in memory only - #20)
  • Sending retain messages
  • Async API (publish blocks if send queue is full)
  • Data agnostic
  • Message ordering
  • KeepAlive mechanism support (PingReq/PingResp) (#11)
  • Auto reconnect to broker (#15)
  • TLS/SSL (#16)
  • On subscribe topics validation (#17)
  • Last Will and Testament (LWT) (#21)
  • Delivery retry (#14)

Pull Requests are welcome, don't be shy ;)

Usage

Example code can be found in the examples directory.

Publisher

Simple publisher which connects to the MQTT broker and periodically sends a message. Implicitly it connects to 127.0.0.1:1883

auto settings = Settings();
settings.clientId = "test publisher";

auto mqtt = new MqttClient(settings);
mqtt.connect();

auto publisher = runTask(() {
        while (mqtt.connected) {
            mqtt.publish("chat", "I'm still here!!!");
            sleep(2.seconds());
        }
    });

Subscriber

Simple subscriber which connects to the MQTT broker, subscribes to the topic and outputs each received message. Implicitly it connects to 127.0.0.1:1883

auto settings = Settings();
settings.clientId = "test subscriber";
settings.onConnAck = (scope MqttClient ctx, in ConnAck packet)
{
    ctx.subscribe(["chat"]);
};
settings.onPublish = (scope MqttClient ctx, in Publish packet)
{
    writeln("chat: ", cast(string)packet.payload);
};

auto mqtt = new MqttClient(settings);
mqtt.connect();

vibe-mqtt's People

Contributors

crimaniak avatar ghost91- avatar gizmomogwai avatar kubo39 avatar renezwanenburg avatar tchaloupka avatar yazd avatar zoadian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

vibe-mqtt's Issues

No way to hook on disconnect event

I do inherits from MqttClient and want to clean some vars on disconnect. Now I can't do it because disconnect() is final and there is no any hooks like onDisconnect().

PingReq and PingResp handling

Add possibility to automatically send PingReq and handle delivery (or not) of PingResp to check connection state.

Acquiring writer of already owned connection

I don't know it's problem in vibe or herre. Very simple loop: one task, one mqtt publisher, trying send messages in fix time steps

    auto publisherQ2 = runTask(()
    {
        while (mqtt.connected)
        {
            auto meas = ctrl.measures;
            mqtt.publish("measures", meas.serializeToPrettyJson, QoSLevel.QoS2);
        }
    });

Gets error:

Full error: core.exception.AssertError@../../../../.dub/packages/vibe-d-0.8.0/vibe-d/core/vibe/core/drivers/libevent2_tcp.d(424): Acquiring writer of already owned connection.
----------------
??:? _d_assert_msg [0x75c28e]
../../../../.dub/packages/vibe-d-0.8.0/vibe-d/core/vibe/core/drivers/libevent2_tcp.d:424 @safe void vibe.core.drivers.libevent2_tcp.Libevent2TCPConnection.acquireWriter() [0x6cae3a]
../../../../.dub/packages/vibe-d-0.8.0/vibe-d/core/vibe/core/drivers/libevent2_tcp.d:364 @safe ulong vibe.core.drivers.libevent2_tcp.Libevent2TCPConnection.write(const(ubyte[]), vibe.core.stream.IOMode) [0x6ca74d]
../../../../.dub/packages/vibe-d-0.8.0/vibe-d/core/vibe/core/stream.d:136 @safe void vibe.core.stream.OutputStream.write(const(ubyte[])) [0x6e91fa]
../../../../.dub/packages/vibe-mqtt-0.2.0-alpha.6/vibe-mqtt/source/mqttd/client.d:1114 @safe bool mqttd.client.MqttClient.send!(mqttd.messages.PubRel).send(mqttd.messages.PubRel) [0x6a71bd]
../../../../.dub/packages/vibe-mqtt-0.2.0-alpha.6/vibe-mqtt/source/mqttd/client.d:780 @safe void mqttd.client.MqttClient.onPubRec(mqttd.messages.PubRec) [0x69ca27]
../../../../.dub/packages/vibe-mqtt-0.2.0-alpha.6/vibe-mqtt/source/mqttd/client.d:952 @safe void mqttd.client.MqttClient.proccessData(const(ubyte[])) [0x69d5d2]
../../../../.dub/packages/vibe-mqtt-0.2.0-alpha.6/vibe-mqtt/source/mqttd/client.d:997 @safe void mqttd.client.MqttClient.listener() [0x69d8a0]
../../../../.dub/packages/vibe-d-0.8.0/vibe-d/core/vibe/core/core.d:620 void vibe.core.core.makeTaskFuncInfo!(@safe void delegate()).makeTaskFuncInfo(ref @safe void delegate()).callDelegate(vibe.core.core.TaskFuncInfo*) [0x6a36d7]
../../../../.dub/packages/vibe-d-0.8.0/vibe-d/core/vibe/core/core.d:1269 void vibe.core.core.CoreTask.run() [0x6b87fe]
??:? void core.thread.Fiber.run() [0x79f497]
??:? fiber_entryPoint [0x79f1fa]
??:? [0xffffffff]
Main thread exiting

if i sleep 100.msecs in task loop it's work several hours and throws too

MQTT Received PUBCOMP with unknown ID - PubComp(XXX, YYY)

Some time after the start of my programs I have messages like this

MQTT Received PUBCOMP with unknown ID - PubComp(XXX, YYY)

and

1497350637: Warning: Received PUBREL from One.pub for an unknown packet identifier XXX.

from mosquitto 1.4.11.
It's greatly slows down message dispatching to 3-5 minutes per message.
It's for QoS2, without QoS2 messages are lose.

here source code
https://github.com/deviator/drmi/tree/master/source/drmi/ps
https://github.com/deviator/drmi/tree/master/example/mqtt

Code crush on no_ack=True

I am learning mqtt. I wrote next consumer (from RabbitMQ examples):

import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
chan = conn.channel()

def callback(ch, method, propeties, body):
	print("I got: ", body)

chan.basic_consume(callback, queue='test', no_ack=True)
chan.start_consuming()

I am noob in MQ. So I simply change default port in D example to: settings.port = 5672;. When I am running publisher I am getting error: main(lbbt) WRN] MQTT ConAck not received, disconnecting. What I am doing wrong? The port number incorrect? Problem in no_ack=True or what?

disconnect() hangs up then listener() fibre in different task

In disconnect() method you first try to join() listener and then do _con.close(); But listener itself exits by _con.connected condition, so we have infinite loop when the listener runned in different task.
I move if(Task.getThis !is _listener) _listener.join; after _con.close() and my problem was resolved, but I think it need more correct solution here, like bool listenerPleaseExit flag to avoid situation when listener try to read from closed socket.

Inflight queue can cause reading to be blocked altogether when full

For example when we use QoS2 and the inflight queue is filled up by a very high ratio of the messages coming in, then no further messages are read from the connection.

There should probably be introduced some priority handling of currently processed messages to be able to finish them as soon as possible.
As now the PubRec is sent immediately when the new packet is received causing the queue fill even more.
When the queue is full, no other control messages can be processed which leads to a blocked subscriber.
Too old packets should be also removed from the queues. This is related to #14

Add possibility to save and load session state

Session is stored in memory, so it is not persistent in any way.
To make possible for users of this library to implement persistent storage, it is needed to add some mechanism to export and import session state.

This also depends on #14

A way to block publish instead of throwing old messages

Currently if the sending buffer is filled up, then it begin to throw away the old messages.
For QoS==0 it seems to be ok, but for higher levels it is probably wrong behaviour.

  • Check the desired behaviour in the MQTT specs
  • Maybe block the publish method call till there is a space for the new message

Message in log: protocol_header_corrupt

I installed rabbitmq 3.2.4 and enabled the mqtt plugin. Then I clone this repository and run dub -v --compiler=ldc2 inside examples/publisher. Running the application displays a range violation and I find the message protocol_header_corrupt in the log file. Any hint?

Message delivery retry

Specs - Message delivery retry, Message ordering

When a Client reconnects with CleanSession set to 0, both the Client and Server MUST re-send any unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers.
This is the only circumstance where a Client or Server is REQUIRED to redeliver messages.

The DUP flag MUST be set to 1 by the Client or Server when it attempts to re-deliver a PUBLISH Packet . The DUP flag MUST be set to 0 for all QoS 0 messages

Incompatibility with new vibe-d:core

There are some compilation errors:

source/mqttd/client.d(190,19): Error: shared method vibe.core.sync.ManualEvent.emit is not callable using a non-shared object
source/mqttd/client.d(291,29): Error: cannot implicitly convert expression (createManualEvent()) of type LocalManualEvent to ManualEvent
source/mqttd/client.d(291,29): Error: cannot implicitly convert expression (createManualEvent()) of type LocalManualEvent to ManualEvent
source/mqttd/client.d(461,14): Error: shared method vibe.core.sync.ManualEvent.emit is not callable using a non-shared object
source/mqttd/client.d(461,14): Error: shared method vibe.core.sync.ManualEvent.emit is not callable using a non-shared object
source/mqttd/client.d(548,29): Error: function vibe.core.core.createTimer (void delegate() nothrow @safe callback) is not callable using argument types (void delegate() @safe)
source/mqttd/client.d(559,15): Error: incompatible types for ((this._con) is (null)): 'TCPConnection' and 'typeof(null)'
source/mqttd/client.d(393,21): Error: shared method vibe.core.sync.ManualEvent.emit is not callable using a non-shared object
source/mqttd/client.d(393,21): Error: shared method vibe.core.sync.ManualEvent.emit is not callable using a non-shared object
source/mqttd/client.d(597,17): Error: incompatible types for ((this._con) is (null)): 'TCPConnection' and 'typeof(null)'
source/mqttd/client.d(622,11): Error: incompatible types for ((this._con) !is (null)): 'const(TCPConnection)' and 'typeof(null)'
source/mqttd/client.d(667,28): Deprecation: function vibe.core.core.setTimer is deprecated - Use anothrow callback as argument to setTimer.
source/mqttd/client.d(690,30): Deprecation: function vibe.core.core.setTimer is deprecated - Use anothrow callback as argument to setTimer.
source/mqttd/client.d(715,33): Deprecation: function vibe.core.core.setTimer is deprecated - Use anothrow callback as argument to setTimer.
source/mqttd/client.d(710,29): Deprecation: function vibe.core.core.setTimer is deprecated - Use anothrow callback as argument to setTimer.
source/mqttd/client.d(402,14): Error: shared method vibe.core.sync.ManualEvent.emit is not callable using a non-shared object
source/mqttd/client.d(364,16): Error: none of the overloads of 'wait' are callable using a non-shared object, candidates are:
../../.dub/packages/vibe-core-1.0.0-beta.4/vibe-core/source/vibe/core/sync.d(896,6):        vibe.core.sync.ManualEvent.wait()
../../.dub/packages/vibe-core-1.0.0-beta.4/vibe-core/source/vibe/core/sync.d(905,6):        vibe.core.sync.ManualEvent.wait(int emit_count)
../../.dub/packages/vibe-core-1.0.0-beta.4/vibe-core/source/vibe/core/sync.d(907,6):        vibe.core.sync.ManualEvent.wait(Duration timeout, int emit_count)
source/mqttd/client.d(373,14): Error: shared method vibe.core.sync.ManualEvent.emit is not callable using a non-shared object

Proper Packet Id usage

Each time a Client sends a new packet (which has packetId) it MUST assign it a currently unused Packet Identifier. If a Client re-sends a particular Control Packet, then it MUST use the same Packet Identifier in subsequent re-sends of that packet. The Packet Identifier becomes available for reuse after the Client has processed the corresponding acknowledgement packet.

  • In the case of a QoS 1 PUBLISH this is the corresponding PUBACK
  • in the case of QoS 2 it is PUBCOMP.
  • For SUBSCRIBE or UNSUBSCRIBE it is the corresponding SUBACK or UNSUBACK.

Currently PacketId is just incremented and hasn't being checked against packets stored in the session. This can lead to some problemswith conflicting packet id.

Maintain client session state

Specs - Storing state

It is necessary for the Client Session state in order to provide Quality of Service guarantees. The Client MUST store Session state for the entire duration of the Session. A Session MUST last at least as long it has an active Network Connection.

Add possibility to connect with Clean Session flag on/off.

Partially done. Additional work needed to support QoS Level2 (#12)

QoS 2 level support

Specs - Quality of Service levels and protocol flows

This is the highest quality of service, for use when neither loss nor duplication of messages are acceptable. There is an increased overhead associated with this quality of service.

A QoS 2 message has a Packet Identifier in its variable header. The receiver of a QoS 2 PUBLISH Packet acknowledges receipt with a two-step acknowledgement process.

Sender

  • MUST assign an unused Packet Identifier when it has a new Application Message to publish.
  • MUST send a PUBLISH packet containing this Packet Identifier with QoS=2, DUP=0.
  • MUST treat the PUBLISH packet as “unacknowledged” until it has received the corresponding PUBREC packet from the receiver.
  • MUST send a PUBREL packet when it receives a PUBREC packet from the receiver. This PUBREL packet MUST contain the same Packet Identifier as the original PUBLISH packet.
  • MUST treat the PUBREL packet as “unacknowledged” until it has received the corresponding PUBCOMP packet from the receiver.
  • MUST NOT re-send the PUBLISH once it has sent the corresponding PUBREL packet.

Receiver

  • MUST respond with a PUBREC containing the Packet Identifier from the incoming PUBLISH Packet, having accepted ownership of the Application Message.
  • Until it has received the corresponding PUBREL packet, the Receiver MUST acknowledge any subsequent PUBLISH packet with the same Packet Identifier by sending a PUBREC. It MUST NOT cause duplicate messages to be delivered to any onward recipients in this case.
  • MUST respond to a PUBREL packet by sending a PUBCOMP packet containing the same Packet Identifier as the PUBREL.
  • After it has sent a PUBCOMP, the receiver MUST treat any subsequent PUBLISH packet that contains that Packet Identifier as being a new publication.

Wait for ConnAck before sending pending messages

If ConnAck is not received after sending Connect packet, client has to disconnect itself. Clients typically wait for a CONNACK Packet, However, if the Client exploits its freedom to send Control Packets before it receives a CONNACK, it might simplify the Client implementation as it does not have to police the connected state. The Client accepts that any data that it sends before it receives a CONNACK packet from the Server will not be processed if the Server rejects the connection. Clients need not wait for a CONNACK Packet to arrive from the Server.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.