Coder Social home page Coder Social logo

emqtt's Introduction

emqtt

build_packages run_test_cases

MQTT client library and command line tools implemented in Erlang that supports MQTT v5.0/3.1.1/3.1.

Getting started

As a Command Line Tool

Build

$ make

Optional, you could disable QUIC support if you have problem with compiling

BUILD_WITHOUT_QUIC=1 make

Once you've compiled successfully you will get a script called emqtt in _build/emqtt/rel/emqtt/bin. We can see what emqtt can do with --help option:

$ ./emqtt --help
Usage: emqtt pub | sub [--help]

emqtt pub is used to publish a single message on a topic and exit. emqtt sub is used to subscribe to a topic and print the messages that it receives.

Command-line Syntax

Options can have both short (single character) and long (string) option names.

A short option can have the following syntax:

-e arg         Single option 'e', argument "arg"

A long option can have the following syntax:

--example=arg  Single option 'example', argument "arg"
--example arg  Single option 'example', argument "arg"

Publish

Synopsis

./emqtt pub [-h [<host>]] [-p <port>] [-I <iface>]
            [-V [<protocol_version>]] [-u <username>]
            [-P <password>] [-C <clientid>] [-k [<keepalive>]]
            [-t <topic>] [-q [<qos>]] [-r [<retain>]]
            [--help <help>] [--will-topic <will_topic>]
            [--will-payload <will_payload>]
            [--will-qos [<will_qos>]]
            [--will-retain [<will_retain>]]
            [--enable-websocket [<enable_websocket>]]
            [--enable-quic [<enable_quic>]]
            [--enable-ssl [<enable_ssl>]]
            [--tls-version [<tls_version>]]
            [--CAfile <cafile>] [--cert <cert>] [--key <key>]
            [--payload <payload>]
            [--file <path/to/file>]
            [--repeat [<repeat>]]
            [--repeat-delay [<repeat_delay>]]

Options

-h, --host

  Specify the host to connect to, support for domain name and IP address. Defaults to localhost.

-p, --port

  Specify the port to connect to. If not given, the default of 1883 for MQTT or 8883 for MQTT over TLS will be used.

-I, --iface

  Specify the network interface or ip address to use.

-V, --protocol-version

  Specify the MQTT protocol version used by the client. Can be v3.1, v3.1.1 and v5. Defaults to v5.

-u, --username

  Specify the username that can be used by the broker for authentication and authorization.

-P, --password

  Specify the password for the username.

-C, --clientid

  Specify the client identifier. If not given, the client identifier in the format emqtt-<Hostname>-<Random Hexadecimal String> will be automatically generated by emqtt_cli.

-k, --keepalive

  Specify the interval in seconds sending PINGREQ packets to the broker. Defaults to 300 seconds.

-t, --topic

  Specify the MQTT topic you want to publish. If the topic beginning with $, you must use single quote(') to specify the topic rather than double quotes("). This is a required option.

-q, --qos

  Specify the quality of service for the message. Can be 0, 1 and 2. Defaults to 0.

-r, --retain

  Specify whether the message is a retained message. Defaults to false.

--payload

  Specify the application message is to be published. This is a required option.

--repeat

  Specify the number of times the message will be repeatedly published. Defaults to 1.

--repeat-count

  Specify the number of seconds to wait after the previous message was delivered before publishing the next. Defaults to 0, it means to publish repeated messages as soon as the previous message is sent.

--will-topic

  Specify the topic of the will message sent when the client disconnects unexpectedly.

--will-qos

  Specify the quality of service of the will message. Defaults to 0. This must be used in conjunction with --will-topic.

--will-retain

  Specify whether the will message is a retained message. Defaults to false. This must be used in conjunction with --will-topic.

--will-payload

  Specify the application message that will be stored by the broker and sent out if this client disconnects unexpectedly. This must be used in conjunction with --will-topic.

--enable-websocket

  Specify enable WebSocket transport or not. This option can't be used with --enable-ssl currently.

--enable-quic

  Use quic as transport. This option can't be combined with --enable-ssl or --enable-websocket

--enable-ssl

  Specify enable SSL/TLS transport or not. This option can't be used with --enable-websocket currently.

--tls-version

  Specify which TLS protocol version to use when communicating with the broker. Valid options are tlsv1.3, tlsv1.2, tlsv1.1 and tlsv1. Defaults to tlsv1.2.

--CAfile

  Specify the path to a file containing PEM encoded CA certificates. This must be used in conjunction with --enable-ssl.

--cert

  Specify the path to a file containing a PEM encoded certificate for this client, if required by the server. This must be used in conjunction with --enable-ssl.

--key

  Specify the path to a file containing a PEM encoded private key for this client, if required by the server. This must be used in conjunction with --enable-ssl.

Examples

Publish a simple message over a TCP connection

$ ./emqtt pub -t "hello" --payload "hello world"
Client emqtt-zhouzibodeMacBook-Pro-4623faa14d8256e9cb95 sent CONNECT
Client emqtt-zhouzibodeMacBook-Pro-4623faa14d8256e9cb95 sent PUBLISH (Q0, R0, D0, Topic=hello, Payload=...(11 bytes))
Client emqtt-zhouzibodeMacBook-Pro-4623faa14d8256e9cb95 sent DISCONNECT

Publish a simple message over a TLS connection

$ ./emqtt pub --enable-ssl=true -t "hello" --payload "hello world" --CAfile=certs/cacert.pem --cert=certs/client-cert.pem --key=certs/client-key.pem
Client emqtt-zhouzibodeMacBook-Pro-cec9489c26e3ed7a38eb sent CONNECT
Client emqtt-zhouzibodeMacBook-Pro-cec9489c26e3ed7a38eb sent PUBLISH (Q0, R0, D0, Topic=hello, Payload=...(11 bytes))
Client emqtt-zhouzibodeMacBook-Pro-cec9489c26e3ed7a38eb sent DISCONNECT

Publish a message repeatedly over a WebSocket connection

$ ./emqtt pub --enable-websocket=true -p 8083 -t "hello" --payload "hello world"
Client emqtt-zhouzibodeMacBook-Pro-1e4677ab46cecf1298ac sent CONNECT
Client emqtt-zhouzibodeMacBook-Pro-1e4677ab46cecf1298ac sent PUBLISH (Q0, R0, D0, Topic=hello, Payload=...(11 bytes))
Client emqtt-zhouzibodeMacBook-Pro-1e4677ab46cecf1298ac sent DISCONNECT

Subscribe

Synopsis

./emqtt sub [-h [<host>]] [-p <port>] [-I <iface>]
            [-V [<protocol_version>]] [-u <username>]
            [-P <password>] [-C <clientid>] [-k [<keepalive>]]
            [-t <topic>] [-q [<qos>]] [--help <help>]
            [--will-topic <will_topic>]
            [--will-payload <will_payload>]
            [--will-qos [<will_qos>]]
            [--will-retain [<will_retain>]]
            [--enable-websocket [<enable_websocket>]]
            [--enable-quic [<enable_quic>]]            
            [--enable-ssl [<enable_ssl>]]
            [--tls-version [<tls_version>]]
            [--CAfile <cafile>] [--cert <cert>]
            [--key <key>]
            [--retain-as-publish [<retain_as_publish>]]
            [--retain-handling [<retain_handling>]]
            [--print [size]]

Options

-h, --host

  See also --host.

-p, --port

  See also --port.

-I, --iface

  See also --iface.

-V, --protocol-version

  See also --protocol-version.

-u, --username

  See also --username.

-P, --password

  See also --password.

-C, --clientid

  See also --clientid.

-k, --keepalive

  See also --keepalive.

-t, --topic

  Specify the MQTT topic you want to subscribe to. This is a required option.

-q, --qos

  Specify the maximum qos level at which the broker can send application messages to the client. Defaults to 0.

--retain-as-publish

  Specify the Retain As Publish option in subscription options. Defaults to 0.

--retain-handling

  Specify the Retain Handling option in subscription options. Defaults to 0.

--print

  Use size to pinrt just the number of received payload bytes. Payload is printed as string if this option is not sepcified.

--will-topic

  See also --will-topic.

--will-qos

  See also --will-qos.

--will-retain

  See also --will-retain.

--will-payload

  See also --will-payload.

--enable-websocket

  See also --enable-websocket.

--enable-quic

  See also --enable-quic.

--enable-ssl

  See also --enable-ssl.

--tls-version

  See also --tls-version.

--CAfile

  See also --CAfile.

--cert

  See also --cert.

--key

  See also --key.

Examples

Build Non-shared Subscription and Recv "hello world"

$ ./emqtt sub -t "hello"
Client emqtt-zhouzibodeMacBook-Pro-1686fee6fdb99f674f2c sent CONNECT
Client emqtt-zhouzibodeMacBook-Pro-1686fee6fdb99f674f2c subscribed to hello
hello world

Build Shared Subscription and Recv "hello world"

$ ./emqtt sub -t '$share/group/hello'
Client emqtt-zhouzibodeMacBook-Pro-288e65bb3f4013d30249 sent CONNECT
Client emqtt-zhouzibodeMacBook-Pro-288e65bb3f4013d30249 subscribed to $share/group/hello
hello world

As a Dependency Library

Add to rebar3 project

Add to rebar.config

...
{deps, [{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "v1.2.0"}}}]}.
...

Build

$ rebar3 compile

Data Types

option()

option() = {name, atom()} |
           {owner, pid()} |
           {host, host()} |
           {port, inet:port_number()} |
           {tcp_opts, [gen_tcp:option()]} |
           {ssl, boolean()} |
           {ssl_opts, [ssl:ssl_option()]} |
           {ws_path, string()} |
           {connect_timeout, pos_integer()} |
           {bridge_mode, boolean()} |
           {clientid, iodata()} |
           {clean_start, boolean()} |
           {username, iodata()} |
           {password, iodata()} |
           {proto_ver, v3 | v4 | v5} |
           {keepalive, non_neg_integer()} |
           {max_inflight, pos_integer()} |
           {retry_interval, pos_integer()} |
           {will_topic, iodata()} |
           {will_payload, iodate()} |
           {will_retain, boolean()} |
           {will_qos, qos()} |
           {will_props, properties()} |
           {auto_ack, boolean()} |
           {ack_timeout, pos_integer()} |
           {force_ping, boolean()} |
           {properties, properties()}

client()

client() = pid() | atom()

host()

host() = inet:ip_address() | inet:hostname()

properties()

properties() = #{'Payload-Format-Indicator' => 0..1,
                 'Message-Expiry-Interval' => 0..16#FFFFFFFF,
                 'Content-Type' => binary(),
                 'Response-Topic' => binary(),
                 'Correlation-Data' => binary(),
                 'Subscription-Identifier' => 1..16#FFFFFFF | [1..16#FFFFFFF, ...],
                 'Session-Expiry-Interval' => 0..16#FFFFFFFF,
                 'Assigned-Client-Identifier' => binary(),
                 'Server-Keep-Alive' => 0..16#FFFF,
                 'Authentication-Method' => binary(),
                 'Authentication-Data' => binary(),
                 'Request-Problem-Information' => 0..1,
                 'Will-Delay-Interval' => 0..16#FFFFFFFF,
                 'Request-Response-Information' => 0..1,
                 'Response-Information' => binary(),
                 'Server-Reference' => binary(),
                 'Reason-String' => binary(),
                 'Receive-Maximum' => 1..16#FFFF,
                 'Topic-Alias-Maximum' => 0..16#FFFF,
                 'Topic-Alias' => 1..16#FFFF,
                 'Maximum-QoS' => 0..1,
                 'Retain-Available' => 0..1,
                 'User-Property' => [{binary(), binary()}],
                 'Maximum-Packet-Size' => 1..16#FFFFFFFF,
                 'Wildcard-Subscription-Available' => 0..1,
                 'Subscription-Identifier-Available' => 0..1,
                 'Shared-Subscription-Available' => 0..1}

qos()

qos() = 0 | 1 | 2

qos_name()

qos_name() = qos0 | at_most_once |
             qos1 | at_least_once |
             qos2 | exactly_once

topic()

topic() = binary()

payload()

payload() = iodata()

packet_id()

packet_id() = 0..16#FFFF

subopt()

subopt() = {rh, 0 | 1 | 2} |
           {rap, boolean()} |
           {nl, boolean()} |
           {qos, qos() | qos_name()}

pubopt()

pubopt() = {retain, boolean()} |
           {qos, qos() | qos_name()}

reason_code()

reason_code() = 0..16#FF

Exports

emqtt:start_link() -> {ok, Pid} | ignore | {error, Reason}

emqtt:start_link(Options) -> {ok, Pid} | ignore | {error, Reason}

  Types

    Pid = pid()

    Reason = term()

    Options = [option()]

Start MQTT client process with specified options. Options will be used in connecting and running.

The following options are available:

{name, Name}

If a name is provided, the gen_statem will be registered with this name. For details see the documentation for the first argument of gen_statem:start_link/4.

{owner, Pid}

Client process will send messages like {diconnected, ReasonCode, Properties} to the owner process.

{host, Host}

The host of the MQTT server to be connected. Host can be a hostname or an IP address. Defaults to localhost

{port, Port}

The port of the MQTT server to be connected. If not given, the default of 1883 for MQTT or 8883 for MQTT over TLS will be used.

{tcp_opts, Options}

Additional options for gen_tcp:connect/3.

{ssl, boolean()}

Enable SSL/TLS transport or not. Defaults to false.

{ssl_opts, Options}

Additional options for ssl:connect/3.

{ws_path, Path}

Path to the resource. Defaults to /mqtt

{connect_timeout, Timeout}

The maximum time to wait to connect to the server and the server returns a CONNACK. Defaults to 60s.

{bridge_mode, boolean()}

Enable bridge mode or not. Defaults to false.

{clientid, ClientID}

Specify the client identifier. If not given, the client identifier will use the value assigned by the server in MQTT v5 or be automatically generated by internal code in MQTT v3.1/v3.1.1.

{clean_start, CleanStart}

Whether the server should discard any existing session and start a new session. Defaults to true.

{username, Username}

Username used by the server for authentication and authorization.

{password, Password}

Password used by the server for authentication and authorization.

{proto_ver, ProtocolVersion}

MQTT protocol version. Defaults to v4.

{keepalive, Keepalive}

Maximum time interval that is permitted to elapse between the point at which the Client finishes transmitting one MQTT Control Packet and the point it starts sending the next. It will be replaced by server keep alive returned from MQTT server.

{max_inflight, MaxInflight}

Max number of QoS 1 and QoS 2 packets in flight. In other words, the number of packets that were sent but not yet acked. Defaults to infinity, means no limit.

{retry_interval, RetryInterval}

Interval to retry sending packets that have been sent but not received a response. Defaults to 30s.

{will_topic, WillTopic}

Topic of will message.

{will_payload, WillPayload}

Payload of will message.

{will_retain, WillRetain}

Whether the server should publish the will message as a retained message. Defaults to false.

{will_qos, WillQoS}

QoS of will message. Defaults to 0.

{will_props, WillProperties}

Properties of will message.

{auto_ack, boolean()}

If true (the default), cliean process will automatically send ack packet like PUBACK when it receives a packet from the server. If false, application decides what to do.

{ack_timeout, AckTimeout}

Maximum time to wait for a reply message. Defaults to 30s.

{force_ping, boolean()}

If false (the default), if any other packet is sent during keep alive interval, the ping packet will not be sent this time. If true, the ping packet will be sent every time.

{properties, Properties}

Properties of CONNECT packet.

emqtt:connect(Client) -> {ok, Properties} | {error, Reason}

  Types

    Client = client()

    Properties = properties()

    Reason = timeout | inet:posix() | any()

Connect to the MQTT server over TCP or TLS and send a CONNECT packet with the options specified in start_link/1, 2. Client must be a pid returned from start_link/1, 2 or a name specified in start_link/1, 2.

Returns:

  • {ok, Properties} if a MQTT connection is established. Properties is propterties in CONNACK packet returned from MQTT server.

  • {error, timeout} if connection can't be established within the specified time

  • {error, inet:posix()} A POSIX error value if something else goes wrong.

emqtt:ws_connect(Client) -> {ok, Properties} | {error, Reason}

  Types

    Same as emqtt:connect/1

Connect to the MQTT server over Websocket and send a CONNECT packet with the options specified in start_link/1, 2. Client must be a pid returned from start_link/1, 2 or a name specified in start_link/1, 2.

emqtt:disconnect(Client) -> ok | {error, Reason}

emqtt:disconnect(Client, ReasonCode) -> ok | {error, Reason}

emqtt:disconnect(Client, ReasonCode, Properties) -> ok | {error, Reason}

  Types

    Client = client()

    ReasonCode = reason_code()

    Properties = properties()

    Reason = closed | inet:posix()

Send a DISCONNECT packet to the MQTT server. ReasonCode specifies a MQTT reason code for DISCONNECT packet, defaults to 0 meaning normal disconnection. Properties specifies properties for DISCONNECT packet, defaults to #{} meaning no properties are attached.

emqtt:ping(Client) -> pong | {error, Reason}

  Types

    Client = client()

    Reason = ack_timeout

Send a PINGREQ packet to the MQTT server. If PINGRESP packet is received from the server within the timeout period, pong is returned. If not, {error, ack_timeout} is returned.

emqtt:subscribe(Client, Properties, Subscriptions) -> {ok, Properties, ReasonCodes} | {error, Reason})

  Types

    Client = client()

    Properties = properties()

    Subscriptions = [{topic(), [subopt()]}]

    ReasonCodes = [reason_code()]

    Reason = term()

Send a SUBSCRIBE packet to the MQTT server. Properties specifies properties for SUBSCRIBE packet, defaults to #{} meaning no properties are attached. Subscriptions specifies pairs of topic filter and subscription options. The topic filter is requried, the subscription options can be [], equivalent to [{rh, 0}, {rap, 0}, {nl, 0}, {qos, 0}].

emqtt:unsubscribe(Client, Properties, Topics) -> {ok, Properties, ReasonCodes} | {error, Reason})

  Types

    Client = client()

    Properties = properties()

    Topics = [topic()]

    ReasonCodes = [reason_code()]

    Reason = term()

Send a UNSUBSCRIBE packet to the MQTT server. Properties specifies properties for SUBSCRIBE packet, defaults to #{} meaning no properties are attached. Topics specifies a list of topic filter with at least one topic filter.

emqtt:publish(Client, Topic, Properties, Payload, PubOpts) -> ok | {ok, PacketId} | {error, Reason})

  Types

    Client = client()

    Topic = topic()

    Properties = properties()

    Payload = payload()

    PubOpts = [pubopt()]

    PacketId = packet_id()

    Reason = term()

Send a PUBLISH packet to the MQTT server. Topic, Properties and Payload specify topic, properties and payload for PUBLISH packet. PubOpts specifies qos and retain flag for PUBLISH packet, defaults to [], equivalent to [{qos, 0}, {retain, false}].

Returns:

  • ok Ii a QoS 0 packet is sent.

  • {ok, PacketId} if a QoS 1/2 packet is sent, the packet identifier will be returned.

  • {error, Reason} if something goes wrong.

emqtt:puback(Client, PacketId) -> ok

emqtt:puback(Client, PacketId, ReasonCode) -> ok

emqtt:puback(Client, PacketId, ReasonCode, Properties) -> ok

  Types

    Client = client()

    PacketId = packet_id()

    ReasonCode = reason_code()

    Properties = properties()

Send a PUBACK packet to the MQTT server. PacketId, ReasonCode and Properties specify packet identifier, reason code and properties for PUBACK packet.

emqtt:pubrec(Client, PacketId) -> ok

emqtt:pubrec(Client, PacketId, ReasonCode) -> ok

emqtt:pubrec(Client, PacketId, ReasonCode, Properties) -> ok

  Types

    Same as emqtt:puback/2, 3, 4.

Send a PUBREC packet to the MQTT server. PacketId, ReasonCode and Properties specify packet identifier, reason code and properties for PUBREC packet.

emqtt:pubrel(Client, PacketId) -> ok

emqtt:pubrel(Client, PacketId, ReasonCode) -> ok

emqtt:pubrel(Client, PacketId, ReasonCode, Properties) -> ok

  Types

    Same as emqtt:puback/2, 3, 4.

Send a PUBREL packet to the MQTT server. PacketId, ReasonCode and Properties specify packet identifier, reason code and properties for PUBREL packet.

emqtt:pubcomp(Client, PacketId) -> ok

emqtt:pubcomp(Client, PacketId, ReasonCode) -> ok

emqtt:pubcomp(Client, PacketId, ReasonCode, Properties) -> ok

  Types

    Same as emqtt:puback/2, 3, 4.

Send a PUBCOMP packet to the MQTT server. PacketId, ReasonCode and Properties specify packet identifier, reason code and properties for PUBCOMP packet.

emqtt:subscriptions(Client) -> Subscriptions

  Types

    Client = client()

    Subscriptions = [{topic(), [subopt()]}]

Return all subscriptions.

emqtt:stop(Client) -> ok

  Types

    Client = client()

Stop a client process.

emqtt:pause(Client) -> ok

  Types

    Client = client()

Pause the client process. The paused client process will ignore all PUBLISH packets received and not send PINGREQ packet if force_ping is set to false.

emqtt:resume(Client) -> ok

  Types

    Client = client()

Resume a client process from a paused state.

Examples

{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]).
{ok, _Props} = emqtt:connect(ConnPid).

SubOpts = [{qos, 1}].
{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, #{}, [{<<"hello">>, SubOpts}]).

ok = emqtt:publish(ConnPid, <<"hello">>, #{}, <<"Hello World!">>, [{qos, 0}]).
{ok, _PktId} = emqtt:publish(ConnPid, <<"hello">>, #{}, <<"Hello World!">>, [{qos, 1}]).

receive
    {disconnect, ReasonCode, Properties} ->
        io:format("Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p~n", [ReasonCode, Properties]);
    {publish, PUBLISH} ->
        io:format("Recv a PUBLISH packet: ~p~n", [PUBLISH]);
    {puback, {PacketId, ReasonCode, Properties}} ->
        io:format("Recv a PUBACK packet - PacketId: ~p, ReasonCode: ~p, Properties: ~p~n", [PacketId, ReasonCode, Properties])
end.

{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, <<"hello">).

ok = emqtt:disconnect(ConnPid).
ok = emqtt:stop(ConnPid).

License

Apache License Version 2.0

Author

EMQX Team.

emqtt's People

Contributors

ansd avatar arpunk avatar belltoy avatar callbay avatar emqplus avatar gilbertwong96 avatar h6w avatar heri16 avatar hiroeorz avatar hjianbo avatar iequ1 avatar jadercorrea avatar keynslug avatar kjellwinblad avatar lafirest avatar mingchuno avatar paulswartz avatar phanimahesh avatar qingchuwudi avatar qzhuyan avatar rory-z avatar savonarola avatar thalesmg avatar tigercl avatar turtledeng avatar wwhai avatar x1001100011 avatar zhengweixing avatar zhongwencool avatar zmstone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

emqtt's Issues

connected event not sent

Hello. Thanks for migrating to the new MQTT client.

I am missing the connected message. It seems that is not sent anywhere in the code. This was useful to get notified once the connection is established, so the subscriptions can be made.

Will you add it back, please?

获取发送者的ID

怎么获取发送者的ID,判断发送者是不是本身?

现在只能获取主题和有效载荷的数据吗?

Need to secure SSL/TLS and provide a better README

The SSL options example in the README currently does not suggest verify nor a dhfile. :dhfile option should be recommended to prevent precalculated attacks. :verify option is needed if client-SSL authentication is not used, if not any self-signed certificate by an attacker would be regarded as valid. Also this library does not verify that the certificate identity is valid for the target domain of the server, this means a possible man-in-the-middle attack. Check out this security talk during ElixirConf.eu 2016:

https://youtu.be/r0DuAse9tK8

Feature Parity with mqtt.js

mqtt.js provides features to simplify handling of intermittent connections, including progressive backoff retries, automatic re-subscription upon connection, and queued offline publishing with configurable drain rate.

Which features do emqttc lack?

How to restrict publish and subscribe using ACL and topic permissions.

I have a 2-node emq cluster. I want to restrict clients from subscribing and publishing to the topics which don't concern them. I saw ACL in the docs but that isn't very clear, some light on the topic would be much appreciated.

TL;DR: Help needed in implementing ACL and Topic restriction on clients.

{:error, {:keyfile, :function_clause}}

iex(1)> {:ok, conn} = :emqtt.start_link([{:client_id, "maxim"},{:force_ping,true},
        {:ssl, true},
        {:ssl_opts,[{:certfile,'../../../deps/ca/priv/ca_ecdsa/certs/client.pem'},
                    {:keyfile,'../../../deps/ca/priv/ca_ecdsa/certs/client.key'},
                    {:ciphers,['ECDHE-ECDSA-AES256-GCM-SHA384']},
                    {:port,8883},
                    {:host,"127.0.0.1"},
                    {:cacertfile,'../../../deps/ca/priv/ca_ecdsa/certs/caroot.pem'}]}])
{:ok, #PID<0.3857.0>}
iex(2)> :emqtt.connect conn
{:error, {:keyfile, :function_clause}}

Would it be possible to apply a tag to the latest version?

Hello,

We would like to use the latest version of this package, but our software clearing team would like for us to pull the code based on a tag and not the latest version. Would it be possible for the maintainer to tag the latest version on the master branch so we could pull by that tag?

Thank you!
David Wolf

emqttc bug

Hope you are doing well. We are using emqttc in our application and are facing errors in receive block. Some of the messages are completely jumbled and dirty payload buffer. Also the message with error keeps repeating (emqttc keeps publishing the same message).

We are using it in Elixir Genserver. The relevant blocks are as follows

Genserver handle_info to receive publish events

def handle_info({:publish, channel,message}, %{client: client, chan: chan}) do
spawn fn -> mqtt_receive(channel,message,chan) end
{:noreply, %{client: client, chan: chan}}
end

defp mqtt_receive(channel,payload,chan) do
[_|values] = String.split(channel,"/")
[service|[iris_id|[request|[]]]] = values
try do
case JSON.decode(payload) do
{:ok,message} ->
payload = %{message: message}
case JSON.encode(payload) do
{:ok,jsonpayload} ->
####
:ok
{:error,msg} ->
IO.puts "Encode Error: #{msg}"
end
{:error,err} ->
IO.puts "Error: #{err}"
end
rescue
exception ->
IO.puts "Error while decoding json from mosca"
:ok
end
end

Should send message to client on (re-)connect or handle re-subscribes also

When the MQTT connection is broken and re-established, the subscriptions are gone. (Just like expected) This means that the client as to re-subscribe to his topics. But as there is no (info) message for this event, there is no option to do so.

My proposal is to have a handle_info(connected, State = #state{mqttc = C, seq = I}) function which is always called after connack, no matter if it is the first connect or a reconnect. Thus this method can be a central point for managing subscriptions.

Alternatively the client could remember the subscriptions and restore them after a reconnect. But I don't think this would be a good idea: Many clients use last-will messages which trigger some stuff and then use on-connected callbacks to indicate to the application that they have regained connection. Therefore a manual connected handling would be preferable IMHO.

Crash after "Merge branch 'hiroeorz-master'" 7c6de640c6a3afadb6f9bb35d515411384a159ae

Hi,

I can't start emqtcc after 7c6de64 commit.

([email protected])2> {ok, Pid} = emqttc:start_link([{host, "test.mosquitto.org"}]).
** exception exit: {badmatch,false}
in function emqttc:init/1 (src/emqttc.erl, line 269)
in call from gen_fsm:init_it/6 (gen_fsm.erl, line 363)
in call from proc_lib:init_p_do_apply/3 (proc_lib.erl, line 239)
([email protected])3> 11:48:42.453 [error] CRASH REPORT Process <0.322.0> with 1 neighbours exited with reason: no match of right hand value false in emqttc:init/1 line 269 in gen_fsm:init_it/6 line 381

Cannot differentiate publish from different clients

From one process I make two MQTT connections and subscribe to the same topic. When a published message is received I cannot see a way to determine from which connection I received it.

{ok, C1} = emqttc:start_link([{host, "server1"}]),
emqttc:subscribe(C1, <<"OneTopic">>),
{ok, C2} = emqttc:start_link([{host, "server2"}]),
emqttc:subscribe(C2, <<"OneTopic">>),
receive
    {publish, Topic, Payload} ->
        % Is it C1 or C2????
        io:format("Message Received from ~s: ~p~n", [Topic, Payload])
end.

Did you thought about this problem? A standard solution would be to include the PID (C1 or C2) into the {publish} message, however that would break backwards compatibility.
Do you see a solution other than wrapping the emqttc in another process and always establish only one emqttc connection from each process?

If broker is not pinged?

I had a problem with checking of the broker. If the broker is not available, when the emqttc: start_link ([{host, "localhost"}, {client_id, << "simpleClient" >>}]), the process falls from {shutdown, econnrefused}. It is possible to check the operation of the broker?

Regression: sync_subscribe topic causes bad_match

Calling emqttc:subscribe over ssl socket works fine, but fails with emqttc:sync_subscribe even for a single topic.

iex(1)> {:ok, client} = :emqttc.start_link(connect_options)
iex(2)> topics = [{"$aws/things/frestive-cps-rene-3001/shadow/update/delta", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/update/accepted", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/update/rejected", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/get/accepted", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/get/rejected", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/delete/accepted", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/delete/rejected", 0}]
iex(3)> :emqttc.sync_subscribe(client, topics)

[info] [Client <0.170.0>]: connecting to a2qbxv4743ghh5.iot.ap-southeast-1.amaz
naws.com:8883
[error] ** State machine #PID<0.159.0> terminating
** Last message in was {:"$gen_sync_event", {#PID<0.158.0>, #Reference<0.0.2.34
>},
 {:subscribe, #PID<0.158.0>,
  [{"$aws/things/frestive-cps-rene-3001/shadow/update/delta", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/update/accepted", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/update/rejected", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/get/accepted", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/get/rejected", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/delete/accepted", 0},
   {"$aws/things/frestive-cps-rene-3001/shadow/delete/rejected", 0}]}}
** When State == :connected
**      Data  == {:state, #PID<0.158.0>, '<0.159.0>',
 'a2qbxv4743ghh5.iot.ap-southeast-1.amazonaws.com', 8883,
 {:ssl_socket, #Port<0.8960>,
  {:sslsocket, {:gen_tcp, #Port<0.8960>, :tls_connection, :undefined},
   #PID<0.165.0>}}, #PID<0.167.0>,
 {:proto_state,
  {:ssl_socket, #Port<0.8960>,
   {:sslsocket, {:gen_tcp, #Port<0.8960>, :tls_connection, :undefined},
    #PID<0.165.0>}}, '10.101.10.218:24665', 4, "MQTT", "frestive-cps-rene-3001"

  true, 60, false,
  {:mqtt_message, 0, false, false, :undefined, :undefined, :undefined},
  :undefined, :undefined, 1, %{}, %{}, %{}, %{},
  {:gen_logger, :console_logger, 2}}, [], %{}, [], [], %{}, :undefined, false,
 {:keepalive,
  {:ssl_socket, #Port<0.8960>,
   {:sslsocket, {:gen_tcp, #Port<0.8960>, :tls_connection, :undefined},
    #PID<0.165.0>}}, :send_oct, 1700, 60, {:keepalive, :timeout},
  #Reference<0.0.2.344>}, 60, 30, 4, 8, :undefined, :ssl, :undefined,
 {:gen_logger, :console_logger, 2}, [],
 [cacertfile: 'config/certs/root-CA.crt',
  certfile: 'config/certs/fe354dc277-certificate.pem.crt',
  keyfile: 'config/certs/fe354dc277-private.pem.key']}
** Reason for termination =
** {{:badmatch,
  {:next_state, :connected,
   {:state, #PID<0.158.0>, '<0.159.0>',
    'a2qbxv4743ghh5.iot.ap-southeast-1.amazonaws.com', 8883,
    {:ssl_socket, #Port<0.8960>,
     {:sslsocket, {:gen_tcp, #Port<0.8960>, :tls_connection, :undefined},
      #PID<0.165.0>}}, #PID<0.167.0>,
    {:proto_state,
     {:ssl_socket, #Port<0.8960>,
      {:sslsocket, {:gen_tcp, #Port<0.8960>, :tls_connection, :undefined},
       #PID<0.165.0>}}, '10.101.10.218:24665', 4, "MQTT",
     "frestive-cps-rene-3001", true, 60, false,
     {:mqtt_message, 0, false, false, :undefined, :undefined, :undefined},
     :undefined, :undefined, 2,
     %{"$aws/things/frestive-cps-rene-3001/shadow/delete/accepted" => 0,
       "$aws/things/frestive-cps-rene-3001/shadow/delete/rejected" => 0,
       "$aws/things/frestive-cps-rene-3001/shadow/get/accepted" => 0,
       "$aws/things/frestive-cps-rene-3001/shadow/get/rejected" => 0,
       "$aws/things/frestive-cps-rene-3001/shadow/update/accepted" => 0,
       "$aws/things/frestive-cps-rene-3001/shadow/update/delta" => 0,
       "$aws/things/frestive-cps-rene-3001/shadow/update/rejected" => 0}, %{},
     %{}, %{}, {:gen_logger, :console_logger, 2}},
    [{#PID<0.158.0>, #Reference<0.0.2.349>}],
    %{"$aws/things/frestive-cps-rene-3001/shadow/delete/accepted" => {0,
       [#PID<0.158.0>]},
      "$aws/things/frestive-cps-rene-3001/shadow/delete/rejected" => {0,
       [#PID<0.158.0>]},
      "$aws/things/frestive-cps-rene-3001/shadow/get/accepted" => {0,
       [#PID<0.158.0>]},
      "$aws/things/frestive-cps-rene-3001/shadow/get/rejected" => {0,
       [#PID<0.158.0>]},
      "$aws/things/frestive-cps-rene-3001/shadow/update/accepted" => {0,
       [#PID<0.158.0>]},
      "$aws/things/frestive-cps-rene-3001/shadow/update/delta" => {0,
       [#PID<0.158.0>]},
      "$aws/things/frestive-cps-rene-3001/shadow/update/rejected" => {0,
       [#PID<0.158.0>]}}, [], [], %{}, 1, false,
    {:keepalive,
     {:ssl_socket, #Port<0.8960>,
      {:sslsocket, {:gen_tcp, #Port<0.8960>, :tls_connection, :undefined},
       #PID<0.165.0>}}, :send_oct, 1700, 60, {:keepalive, :timeout},
     #Reference<0.0.2.344>}, 60, 30, 4, 8, :undefined, :ssl, :undefined,
    {:gen_logger, :console_logger, 2}, [],
    [cacertfile: 'config/certs/root-CA.crt',
     certfile: 'config/certs/fe354dc277-certificate.pem.crt',
     keyfile: 'config/certs/fe354dc277-private.pem.key']}, :hibernate}},
 [{:emqttc, :connected, 3, [file: 'src/emqttc.erl', line: 671]},
  {:gen_fsm, :handle_msg, 7, [file: 'gen_fsm.erl', line: 518]},
  {:proc_lib, :wake_up, 3, [file: 'proc_lib.erl', line: 250]}]}

** (exit) exited in: GenServer.call(:thing_shadow, {:thing_register, "frestive-
ps-rene-3001", []}, 5000)
    ** (EXIT) an exception was raised:
        ** (MatchError) no match of right hand side value: {:next_state, :conne
ted, {:state, #PID<0.158.0>, '<0.159.0>', 'a2qbxv4743ghh5.iot.ap-southeast-1.am
zonaws.com', 8883, {:ssl_socket, #Port<0.8960>, {:sslsocket, {:gen_tcp, #Port<0
8960>, :tls_connection, :undefined}, #PID<0.165.0>}}, #PID<0.167.0>, {:proto_st
te, {:ssl_socket, #Port<0.8960>, {:sslsocket, {:gen_tcp, #Port<0.8960>, :tls_co
nection, :undefined}, #PID<0.165.0>}}, '10.101.10.218:24665', 4, "MQTT", "frest
ve-cps-rene-3001", true, 60, false, {:mqtt_message, 0, false, false, :undefined
 :undefined, :undefined}, :undefined, :undefined, 2, %{"$aws/things/frestive-cp
-rene-3001/shadow/delete/accepted" => 0, "$aws/things/frestive-cps-rene-3001/sh
dow/delete/rejected" => 0, "$aws/things/frestive-cps-rene-3001/shadow/get/accep
ed" => 0, "$aws/things/frestive-cps-rene-3001/shadow/get/rejected" => 0, "$aws/
hings/frestive-cps-rene-3001/shadow/update/accepted" => 0, "$aws/things/frestiv
-cps-rene-3001/shadow/update/delta" => 0, "$aws/things/frestive-cps-rene-3001/s
adow/update/rejected" => 0}, %{}, %{}, %{}, {:gen_logger, :console_logger, 2}},
[{#PID<0.158.0>, #Reference<0.0.2.349>}], %{"$aws/things/frestive-cps-rene-3001
shadow/delete/accepted" => {0, [#PID<0.158.0>]}, "$aws/things/frestive-cps-rene
3001/shadow/delete/rejected" => {0, [#PID<0.158.0>]}, "$aws/things/frestive-cps
rene-3001/shadow/get/accepted" => {0, [#PID<0.158.0>]}, "$aws/things/frestive-c
s-rene-3001/shadow/get/rejected" => {0, [#PID<0.158.0>]}, "$aws/things/frestive
cps-rene-3001/shadow/update/accepted" => {0, [#PID<0.158.0>]}, "$aws/things/fre
tive-cps-rene-3001/shadow/update/delta" => {0, [#PID<0.158.0>]}, "$aws/things/f
estive-cps-rene-3001/shadow/update/rejected" => {0, [#PID<0.158.0>]}}, [], [],
{}, 1, false, {:keepalive, {:ssl_socket, #Port<0.8960>, {:sslsocket, {:gen_tcp,
#Port<0.8960>, :tls_connection, :undefined}, #PID<0.165.0>}}, :send_oct, 1700,
0, {:keepalive, :timeout}, #Reference<0.0.2.344>}, 60, 30, 4, 8, :undefined, :s
l, :undefined, {:gen_logger, :console_logger, 2}, [], [cacertfile: 'config/cert
/root-CA.crt', certfile: 'config/certs/fe354dc277-certificate.pem.crt', keyfile
 'config/certs/fe354dc277-private.pem.key']}, :hibernate}
            src/emqttc.erl:671: :emqttc.connected/3
            (stdlib) gen_fsm.erl:518: :gen_fsm.handle_msg/7
            (stdlib) proc_lib.erl:250: :proc_lib.wake_up/3
    (elixir) lib/gen_server.ex:564: GenServer.call/3

Unexpected PUBACK packet received

=ERROR REPORT==== 12-Jul-2019::11:49:17.754315 ===
emqtt(emqs-MacBook-Pro_bench_pub_1_2835554768): State: connected, Unexpected Event: (cast, {mqtt_packet,
                                                                                            {mqtt_packet_header,
                                                                                             5,
                                                                                             false,
                                                                                             0,
                                                                                             false},
                                                                                            {mqtt_packet_puback,
                                                                                             18024,
                                                                                             147,
                                                                                             #{}},
                                                                                            undefined})


=ERROR REPORT==== 12-Jul-2019::12:10:49.543783 ===
emqtt(emqs-MacBook-Pro_bench_pub_1_2641837960): State: connected, Unexpected Event: (cast, {mqtt_packet,
                                                                                            {mqtt_packet_header,
                                                                                             5,
                                                                                             false,
                                                                                             0,
                                                                                             false},
                                                                                            {mqtt_packet_puback,
                                                                                             5076,
                                                                                             147,
                                                                                             #{}},
                                                                                            undefined})

Tag 0.7.1-beta: auto resubscribe with the unsubscribed topic already

Scenario :
Performed connect to broker :
{ok, ClientA} = emqttc:start_link([{host, "localhost"}, {client_id, <<"simpleClient">>, {logger, info}, auto_resub, {reconnect, {3, 120, 10}}]),

subscribe topic <<"mongo">> with client A
subscribe topic <<"orange">> with client A
unsubscribe topic <<"mongo">> with client A
At broker admin page, see that topic <<"mongo">> already unsubscribed from subscriptions list
Stop broker emqttd, then start broker again, and emqttc reconnected to broker successfully

Observate the issue :
Check broker admin page again, see that topic <<"orange">> and <<"mongo">> is in subscriptions list

Expection Result:
Topic <<"mongo">> should not in subscription list

Unable to Publish a message to EMQTT via Emqttc

I am sending a message from one user to another user, In emqttd plugin template I am able to see that message get published, delivered and acked but when i publish the same from emqttd code then I get an error of

15:00:28.474 <0.376.0> [error] Session(User): Unexpected EXIT: client_pid=<0.375.0>, exit_pid=<0.445.0>, reason=normal
client emqttc_Kunals-MBP_1cb77deeefbd4efccfa9 connected, connack: 0
client emqttc_Kunals-MBP_1cb77deeefbd4efccfa9 disconnected, reason: normal
15:11:16.489 <0.410.0> [error] Unexpected Info: {mqttc,<0.411.0>,connected}
15:11:16.489 <0.410.0> [error] Session(emqttc_Kunals-MBP_27bddc8d6d80fda6d6ce): Unexpected EXIT: client_pid=undefined, exit_pid=<0.411.0>, reason=normal

It shows like session of user to whom i am sending a message get closed

My code is described below

{ok, C1} = emqttc:start_link([{host, "localhost"},{port,17576},{clean_sess,false},{logger, {lager, error}}]),
emqttc:subscribe(C1, <<"kernal">>,1),
emqttc:publish(C1, <<"kernal">>, <<"hello world">>,1),
receive
{publish, Topic, Payload} ->
io:format("Message Received from ~s: pn", [Topic, Payload])
after
1000 ->
io:format("Error: receive timeout!~n")
end,
emqttc:disconnect(C1).

Please help me to resolve such issue.

How to wait for receiving PUBACK message ?

I write the following simple test code :

...
    %% connect MQTT broker
    {ok, Client} = emqttc:start_link([{host, Host}, {port, Port}, {client_id, ClientId}, {proto_ver, 3},
                       {username, UserName}, {password, Password}]),

    %% publish the message
    emqttc:publish(Client, ?TEST_TOPIC, ?TEST_PAYLOAD, qos1),
...

If I don't use delay, the client cannot received the PUBACK, is it correct ?
Have you ever consider to add a new state like 'waiting_for_puback' ( looks like 'waiting_for_connack' ) ?

function_clause error...

** {function_clause,
       [{emqttc,connecting,
            [{subscribe,<0.10884.2>,[{<<"client1/24539/2717767000">>,0}]},
             {state,<0.10884.2>,"<0.10885.2>","server",1883,undefined,
                 undefined,
                 {proto_state,undefined,undefined,4,<<"MQTT">>,
                     <<"testclient_client1_24539_1905181425">>,true,60,false,
                     {mqtt_message,0,false,false,undefined,undefined,
                         undefined},
                     undefined,undefined,1,#{},#{},#{},#{},
                     {gen_logger,console_logger,4}},
                 [],#{},[],[],false,undefined,60,30,undefined,tcp,undefined,
                 {gen_logger,console_logger,4}}],
            [{file,"src/emqttc.erl"},{line,351}]},
        {gen_fsm,handle_msg,7,[{file,"gen_fsm.erl"},{line,503}]},
        {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,237}]}]}
client 24539 EXIT: {function_clause,

Receiving large messages

When receiving messages with few kilobytes, I found that memory usage shots up to unsustainable levels. Looking deeper into the code, I believe, the (very elegant) way in which packages are pulled together appending binaries and also returning functions with closures, could be pointed as being the "offender".
Would it be reasonable to change this to using iolist and perhaps avoid the return of functions that accumulate closures? Or am I misusing emqttc altogether?

emqttc is not sending PINGREQs to Mosquitto broker.

TL;DR: Mosquitto and emqttc do not work well with each other. Other brokers work fine with emqttc.

Erlang version: Erlang/OTP 20 [erts-9.0]
Elixir version: 1.5.1
Mosquitto version: 1.4.10

(Have also tested on Erlang/OTP 19.2 and Elixir 1.4.5)

The messages I get from Mosquitto are:

1505752794: Client xams_router has exceeded timeout, disconnecting.
1505752794: Socket error on client xams_router, disconnecting.

The log output from emqttc when the broker disconnects the client (This log uses a keepalive of 5 seconds):

19:06:30.903 [info]  MQTT client now connecting to broker.

19:06:30.903 [debug] Connecting without SSL.
[info] [Client <0.244.0>]: connecting to localhost:1883

19:06:30.915 [debug] Connected!
19:06:30.915 [info] Application xams started on node nonode@nohost
Interactive Elixir (1.4.5) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> [debug] [[email protected]:33967] SENT: CONNECT(Q0, R0, D0, ClientId=xams_router, ProtoName=MQTT, ProtoVsn=4, CleanSess=false, KeepAlive=5, Username=xams, Password=******)
[debug] [[email protected]:33967] SENT: <<16,39,0,4,77,81,84,84,4,192,0,5,
                                              0,11,120,97,109,115,95,114,111,
                                              117,116,101,114,0,4,120,97,109,
                                              115,0,8,112,97,115,115,119,48,
                                              114,100>>
[info] [Client <0.244.0>] connected with localhost:1883
[info] [Client <0.244.0>] RECV: CONNACK_ACCEPT

19:06:30.938 [info]  Success. MQTT client connected.
[debug] [[email protected]:33967] SENT: SUBSCRIBE(Q1, R0, D0, PacketId=1, TopicTable=[{<<"xams/#">>,2}])

19:06:30.938 [debug] Begin subscribing phase.

19:06:30.938 [debug] Router about to subscribe to topic -> xams/#

19:06:30.938 [debug] Router now subscribed to topic -> xams/#

19:06:30.938 [debug] Finished subscribing phase.
[debug] [[email protected]:33967] SENT: <<130,11,0,1,0,6,120,97,109,115,47,
                                              35,2>>
[debug] [Client <0.244.0>] RECV: SUBACK(Q0, R0, D0, PacketId=1, QosTable=[2])

nil
iex(2)> [warning] [Client <0.244.0>] Connection lost for: tcp_closed

The code used to start emqttc is:

:emqttc.start_link([
      host: host,
      port: port,
      client_id: client_id,
      clean_sess: false
      keepalive: 15,
      connack_timeout: 30,
      reconnect: {3, 60, 10},
      username: username,
      password: password])

The variables client_id, host, port, username, and password are all valid variables. They are not included here due to privacy concerns.

I subscribe to one topic - xams/# w/ the QoS setting set to :qos2.

I can't see any other reason for Mosquitto to do this. I've tested it on two different servers of mine, and the iot.eclipse.org Mosquitto server, which has the same issue.

EDIT:

I have tried the same code with VerneMQ and RabbitMQ. The bug is not present in those brokers.
I believe this issue is constrained to Mosquitto and emqttc.

falls when there is no connection

crasher: initial call: emqtt:init/1, pid: <0.6626.0>, registered_name: [], exit: {{noproc,{gen_statem,call,[<0.6621.0>,ping,infinity]}},[{gen_statem,loop_receive,3,[{file,"gen_statem.erl"},{line,894}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]}, ancestors: [scanner_server,scanner_sup,<0.293.0>], message_queue_len: 0, messages: [], links: [], dictionary: [], trap_exit: true, status: running, heap_size: 6772, stack_size: 27, reductions: 13462; neighbours:
falls when there is no connection.why it falls when there is no connection? why it is impossible to give the user the option of handling this situation?

clean_sess is not supported

received('CONNACK', State = #proto_state{clean_sess = true}) ->
    %%TODO: Send awaiting...
    {ok, State};

received('CONNACK', State = #proto_state{clean_sess = false}) ->
    %%TODO: Resume Session...
    {ok, State};

can you point me any guides how to implement this in client library?

SSL

How about adding ssl support to the client? That would be nice!

emqttc send willmsg

client ---->
[root@VM_196_198_centos emqtt_benchmark]# ./emqtt_bench_sub -c 1 -i 10 -t bench/%i -q 2
MqttOpts [{client_id,<<"MExjqXi7Q62EDZC5YQLL">>},
{will,[{qos,1},
{retain,false},
{topic,<<"WillTopic">>},
{payload,<<"dfdsf">>}]},
{username,<<"lsx">>},
{password,<<"FFFFFF">>},
{logger,error},
{clean_sess,true},
{keepalive,300},
{port,1883},
{host,"localhost"}]conneted: 0
client 1 EXIT: {shutdown,tcp_closed}

server --->
([email protected])1> 21:09:29.209 [info] lsx get State1 {proto_state,{{127,0,0,1},52041},#Fun<emqttd_client.0.99263764>,true,<<"MExjqXi7Q62EDZC5YQLL">>,<0.375.0>,true,4,<<"MQTT">>,<<"lsx">>,undefined,300,1024,undefined,undefined,{1446,901769,209120}}
21:09:29.209 [info] Client([email protected]:52041): RECV CONNECT(Q0, R0, D0, ClientId=MExjqXi7Q62EDZC5YQLL, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=300, Username=lsx, Password=******)

[help wanted] timeout

application crashes due
{{badarg,[{emqtt,'-timeout_calls/3-fun-0-',4,[{file,"/_build/default/lib/emqtt/src/emqtt.erl"},{line,1131}]},{lists,foldl,3,[{file,"lists.erl"},{line,1263}]},{emqtt,connected,3,[{file,"/_build/default/lib/emqtt/src/emqtt.erl"},{line,935}]},{gen_statem,call_state_function,5,[{file,"gen_statem.erl"},{line,1660}]},{gen_statem,loop_event_state_function,6,[{file,"gen_statem.erl"},{line,1023}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]},{gen_statem,call,[<0.480.0>,ping,infinity]}}

Keep-Alive Stops

I have not fully investigated this issue yet, but I can post logs if you like.

Here are the conditions so far:

  • default KeepAlive=90
  • using ssl connections on port 8883

On initial connect, there is a proper:
44 [debug] [[email protected]:47297] SENT: PINGREQ(Q0, R0, D0)
45 [debug] [[email protected]:47297] SENT: <<192,0>>
[debug] [Client <0.3009.0>] RECV: PINGRESP(Q0, R0, D0)

This particular process is only doing subscriptions. There are no publishes occurring from this. After this initial KeepAlive, a topic is subscribed @ ~3:15:30 and SUBACK received shortly after.

88 [debug] [Client <0.3009.0>] RECV: SUBACK(Q0, R0, D0, PacketId=2, QosTable=[1])
89 03:16:00.769 [notice] <<...
90 03:16:00.770 [notice] <<...
91 03:16:00.771 [info] Write Event for: <<"topic">>
92 03:16:00.771 [info] Publishing <<"...
93 03:16:31.763 [notice] <<...
94 03:16:31.763 [notice] <<...
95 03:17:03.100 [notice] <<...
96 03:17:03.101 [notice] <<...
97 03:17:34.087 [notice] <<...
98 03:17:34.087 [notice] <<...
99 [warning] [Client <0.3009.0>] Connection lost for: ssl_closed
100 [info] [Client <0.3009.0>] try reconnecting...
101 03:17:45.489 [info] {mqttc,<0.3009.0>,disconnected}
102 [info] [Client <0.3009.0>]: connecting to ...

The issue, is that there are no further KeepAlive's triggered. And since we are relying on the internal reconnect process at this point, the KeepAlive process does not appear to be restarted.

I have about a 30-40% chance per run to get it into this state. If it gets into this state, it never leaves.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.