Coder Social home page Coder Social logo

tortoise's Introduction

Tortoise

Build Status Hex.pm Hex version Coverage Status

A MQTT Client application that keep connections to one or more MQTT brokers, handles subscriptions, and expose a publisher for publishing messages to the broker.

Amongst other things Tortoise supports:

  • Keeping a connection to a MQTT server (version 3.1.1 for now)
  • Retry connecting with incremental back-off
  • Publishing and subscribing to topics of QoS 0, 1, and 2
  • Connections support last will message
  • Connecting via TCP and SSL
  • The fundamentals are there, but some of the API's might change in the near future
  • A PubSub system where one can listen to system events. For now connection status and ping response times can be subscribed for statistics and administrative purposes.

Most of the public facing interface should be in the Tortoise module. See the GitHub issues for work in progress "known issues in the design", "what needs to be done", and so forth; feel free to open your own issues if something is confusing or broken.

I would love to get some feedback and help building this thing.

Example

A supervised connection can be started like this:

# connect to the server and subscribe to foo/bar with QoS 0
Tortoise.Supervisor.start_child(
    client_id: "my_client_id",
    handler: {Tortoise.Handler.Logger, []},
    server: {Tortoise.Transport.Tcp, host: 'localhost', port: 1883},
    subscriptions: [{"foo/bar", 0}])

# publish a message on the broker
Tortoise.publish("my_client_id", "foo/bar", "Hello from the World of Tomorrow !", qos: 0)

To connect to a MQTT server using SSL the Tortoise.Transport.SSL transport can be used. This requires configuration of the server's CA certificate, and possibly a client certificate and key. For example, when using the certifi package as the CA trust store:

Tortoise.Supervisor.start_child(
    client_id: "smart-spoon",
    handler: {Tortoise.Handler.Logger, []},
    server: {
      Tortoise.Transport.SSL,
      host: host, port: port,
      cacertfile: :certifi.cacertfile(),
      key: key, cert: cert
    },
    subscriptions: [{"foo/bar", 0}])

Alternatively, for testing purposes, server certificate verification can be disabled by passing verify: :verify_none in the server options. In that case there is no need for CA certificates, but an attacker could intercept the connection without detection!

Look at the connection_test.exs-file for more examples.

Example Handler

defmodule Tortoise.Handler.Example do
  use Tortoise.Handler

  def init(args) do
    {:ok, args}
  end

  def connection(status, state) do
    # `status` will be either `:up` or `:down`; you can use this to
    # inform the rest of your system if the connection is currently
    # open or closed; tortoise should be busy reconnecting if you get
    # a `:down`
    {:ok, state}
  end

  #  topic filter room/+/temp
  def handle_message(["room", room, "temp"], payload, state) do
    # :ok = Temperature.record(room, payload)
    {:ok, state}
  end
  def handle_message(topic, payload, state) do
    # unhandled message! You will crash if you subscribe to something
    # and you don't have a 'catch all' matcher; crashing on unexpected
    # messages could be a strategy though.
    {:ok, state}
  end

  def subscription(status, topic_filter, state) do
    {:ok, state}
  end

  def terminate(reason, state) do
    # tortoise doesn't care about what you return from terminate/2,
    # that is in alignment with other behaviours that implement a
    # terminate-callback
    :ok
  end
end

Upgrade path

pre-0.9 to 0.9

The subscribe/3, unsubscribe/3, subscribe_sync/3, and unsubscribe_sync/3 is no longer exposed on the Tortoise module. The functionality has been moved to the Tortoise.Connection module. The functions has the same arities and functionality, so the upgrade path is a simple search and replace:

  • "Tortoise.subscribe(" -> "Tortoise.Connection.subscribe("
  • "Tortoise.subscribe_sync(" -> "Tortoise.Connection.subscribe_sync("
  • "Tortoise.unsubscribe(" -> "Tortoise.Connection.unsubscribe("
  • "Tortoise.unsubscribe_sync(" -> "Tortoise.Connection.unsubscribe_sync("

This change is done because the Tortoise.Connection module should be in charge of changes to the connection life-cycle.

Installation

Tortoise can be installed by adding tortoise to your list of dependencies in mix.exs:

def deps do
  [
    {:tortoise, "~> 0.9"}
  ]
end

Documentation should be available at https://hexdocs.pm/tortoise.

Development

To start developing, run the following commands:

mix deps.get
MIX_ENV=test mix eqc.install --mini
mix test

Building documentation

To build the documentation run the following command in a terminal emulator:

MIX_ENV=docs mix docs

This will build the documentation in place them in the doc-folder in the root of the project. These docs will also find their way to the Hexdocs website when the project is published on Hex.

License

Copyright 2018 Martin Gausby

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

tortoise's People

Contributors

fhunleth avatar gausby avatar jfcloutier avatar l1ny4n avatar lucaong avatar mariosant avatar mattludwigs avatar qcam avatar rbino avatar sneako avatar tanweerdev avatar thovoll avatar tohojo avatar trarbr avatar voltone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tortoise's Issues

no retry on initial connection

When trying to establish an initial connection, tortoise will die immediately with an error if the connection cannot be established, e.g. due to networking not being available (yet).

        ** (MatchError) no match of right hand side value: {:error, {:shutdown, {:failed_to_start_child, Tortoise.Connection, {{:with_clause, {:error, :nxdomain}}, [{Tortoise.Connection, :do_connect, 2, [file: 'lib/tortoise/connection.ex', line: 340]}, {Tortoise.Connection, :init, 1, [file: 'lib/tortoise/connection.ex', line: 196]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 365]}, {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 333]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}}}

This is an issue, as I am attempting to use tortoise in a nerves project, and tortoise often is started before the networking is established. Ideally tortoise would wait and retry (even better with back-off as in #44) instead of crashing.

Add dialyxir to the project

  • Dialixir should get added as a development dependency
  • All public functions should have specs
  • The project should have all its dialyzer warnings corrected
  • The semaphore-ci setup should get updated to run the dialyzer step

[Feature Request] Tortoise as transport protocol for Phoenix Channels

Pre-requisite

Phoenix channels currently support WebSockets and Long Polling. However, Phoenix application is not limited by these and its creators have made it possible to add other transport methods like MQTT, STOMP, HTML5 SSE, etc.

Proposal

Tortoise abstracts the nuances of MQTT making it a breeze to add MQTT in any project. Supporting Phoenix channels will be a step closer towards it.

Further

  • Implementing this is essentially following the behavior outlined in Phoenix.Socket.Transport documentation.

  • I was planning on starting a PR. However, after reading #106, I figured it'd be best to wait for MQTT v5 and WebSocket support before proceeding. I will leave this here for consideration in the future.

IPv6 support

Hello,

Wondering if it is possible to use this library with IPv6? Normal method with Elixir is to pass :inet6 somehow through socket options, but I don't see any way of doing that here.

I have tried setting:

server: {
...
opts: [:inet6],
}

However, so appears to give :gen_tcp.connect an invalid opts value of:

[:binary, {:packet, :raw}, {:active, false}, {:opts, [:inet6]}]

Where it should be (if I understand correctly):

[:binary, {:packet, :raw}, {:active, false}, :inet6]

OK, so what I did is obviously not correct, however my attempts so far to try to get a plain :inet6 value passed have been failures...

Will keep trying...

Thanks.

Incorrect handling of LWT/ graceful disconnect incorrect?

LWT is triggered on the broker even why the connection is terminated with the disconnect/1 method:

The Tortoise.Connection is added as a child to the DynamicSupervisor and after stopping all children with:
DynamicSupervisor.stop(Tasks, :shutdown)

The LWT is triggered on broker (emqx and mosquitto). Also when calling disconnect/1 I can see that LWT message is published.
:ok = Tortoise.Connection.disconnect("some_client_id")

Ensure that an unsubscribe package contain at least one topic filter

Currently it is possible to encode an empty list in an unsubscribe package. The Tortoise.Connection.unsubscribe/3-function pattern match will match for a list containing one or more items, but the encode function in Tortoise.Package.Unsubscribe allows for any list. We should consider throwing if the unsubscribe list is empty, and we should update the unsubscribe-package-generator to not generate empty lists in the tests.

Connection to broker is down but Tortoise is not reconnecting

I am using Vernemq as my broker. Every now and again the connection with the broker is lost for various reasons non of which are the client or broker being offline.(I am not sure why this happens).
All I know is that restarting the application that uses Tortoise reconnects. Which means that reconnection is not happening automatically.

It is not cleat from the documentation whether one should try to restart the connection upon receiving a :down in connection/3 callback or how to do that should that happen.

As I am an elixir newbie could you point me in the right direction as to how to try to reconnect manually?
Or what to log so you can figure out why Tortoise is not reconnecting automatically.

AWS IoT Documentation

Would you be open to accepting a PR with documentation on how to use Tortoise with AWS IoT? I recently worked through how to do this and think it could be a short section in the README.md or elsewhere. If you'd like this, then where would you prefer the documentation to live?

Remove the defdelegate's to subscription functions from the Tortoise-module

The main Tortoise-module currently implement defdelegates to subscribe/3 and subscribe_sync/3; both pointing to Tortoise.Connection.

I think having them on Tortoise.Connection is good enough. While it is great to be able to create subscriptions during the connection life cycle it would be best (and easiest) if people would be guided to start the connections with the subscriptions specified; using the subscriptions-field on the connection specification—also the handler support changing the subscriptions with the next-states described in the handler documentation.

It is not a big deal to remove, but it would require some communication: Basically we need to bump the version number, and communicate that people need to search and replace Tortoise.subscribe( -> Tortoise.Connection.subscribe(, etc; and update the documentation.

Tortoise.publish can hang forever

Hello,

Under certain circumstances Tortoise.publish() can hang forever. Even specifying a timeout=10 value does not help.

The reproducible test case I (accidentally) found is when connecting to a MQTT server using Tortoise.Transport.SSL where the server it not configured to use SSL.

I would expect to get an error in this situation or a immediate return with no error (as QOS=0), not a indefinite hang.

Regards

Unclear failure mode, failing to connect to AWS IoT using Tortoise.Transport.SSL config

Hi there, I'm new to elixir and I'm running into an issue using Tortoise with the SSL configs against an AWS IoT broker.

My workflow so far has been:

  1. Setup a mosquitto broker ✅
  2. Send messages to the broker using Tortoise ✅
  3. Configure the broker in bridge mode, add valid authenticated certificates, and proxy the requests to AWS IoT. (Mosquitto running in docker, elixir running native on mac, events are successfully ingested into AWS.) ✅
  4. Attempt to configure Tortoise to use the same certificates to connect to the same AWS IoT endpoint. ❌

Any hints at what this problem looks like? (I'm worried it might be a firewall issue.)

    Tortoise.Supervisor.start_child(
      client_id: config.client_id,
      version: config.version,
      handler: {Tortoise.Handler.Logger, []},
      server: {
        Tortoise.Transport.SSL,
        host: config.host,
        port: config.port,
        certfile: './config/client-certificate.pem.key',  # tried using either `cert` or `certfile`
        keyfile: './config/client-private.pem.key',
        cacertfile: config.cacertfile,
        protocol_version: :"tlsv1.2"
      },
      subscriptions: [{"#", 0}]
    )

Here's the relevant part of the current stack trace when it fails:

[error] GenServer {Tortoise.Registry, {Tortoise.Connection, nil}} terminating
** (WithClauseError) no with clause matching: {:error, {:options, {:socket_options, [protocol_version: :"tlsv1.2", packet_size: 0, packet: 0, header: 0, active: false, mode: :binary]}}}
    (tortoise) lib/tortoise/connection.ex:423: Tortoise.Connection.do_connect/2
    (tortoise) lib/tortoise/connection.ex:261: Tortoise.Connection.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :connect

...

    ** (EXIT) time out
    (elixir) lib/gen_server.ex:836: GenServer.call/3
    lib/reverie/cli.ex:54: anonymous fn/1 in Reverie.Cli.handle_chunk/1
    (elixir) lib/enum.ex:737: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir) lib/enum.ex:737: Enum.each/2
    lib/reverie/cli.ex:53: Reverie.Cli.handle_chunk/1
    (elixir) lib/stream.ex:419: anonymous fn/4 in Stream.each/2
    (elixir) lib/stream.ex:1524: anonymous fn/3 in Enumerable.Stream.reduce/3
    (elixir) lib/stream.ex:263: Stream.after_chunk_while/2

Tag: @connorjacobsen

Change `publish_sync/5` to `publish_sync/4`

I think the interface would be much prettier if we moved the timeout into the opts in the publish_sync; the default should still be :infinity

lib/tortoise.ex:22:15:  defdelegate publish_sync(client_id, topic, payload \\ nil, opts \\ [], timeout \\ :infinity),
lib/tortoise/connection.ex:81:7:  def publish_sync(client_id, topic, payload \\ nil, opts \\ [], timeout \\ :infinity) do

Cannot set clean_session false when start a connection

I want to set clean_session false for the offline messages of an mq, but it seems that clean_session is set true and I cannot edit it.

connect = %Package.Connect{
    client_id: client_id,
    user_name: Keyword.get(connection_opts, :user_name),
    password: Keyword.get(connection_opts, :password),
    keep_alive: Keyword.get(connection_opts, :keep_alive, 60),
    will: Keyword.get(connection_opts, :will),
    # if we re-spawn from here it means our state is gone
    clean_session: true #HERE
}

Update the documentation with valid client ids

The documentation and usage examples uses client ids that are invalid according to spec:

The Server MUST allow ClientID’s which are between 1 and 23 UTF-8 encoded bytes in length, and that contain only the characters "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

Seemingly most MQTT servers are very liberal with what they accept when it comes to client ids, so I haven't seen any complains in the wild. Still it would be best to show something that would work for sure in the examples.

Examples where:

  • Elixir module atoms are used, such as MyClientId will expand to Elixir.MyClientId, which contains a dot.
  • Some examples uses dashes
  • and so forth

I guess a good structure to use is something with the structure :client4212 or something.

Store transport+socket in an ETS table outside of the Transmitter process

Right now a publish (plus others) will request a socket from the transmitter process, receive that and send messages onto the wire. If the message was stored outside of the process, in a protected ets table, it would be possible to obtain the socket without going through a process mailbox.

Also, it would be possible to answer the question: Does the connection exist? Right now we will just hang if a publish_sync is made to a non-exsistent connection.

Settle on an interface for the "Driver" behaviour

When event happen (such as the client receiving a message) a callback will get called in a user defined callback module. This module has to implement the Tortoise.Driver behaviour.

Currently the events triggered are when a ping response is received (allow the user to log the duration of the ping somehow), and when one of the subscribed topics receive a message. We need to specify which events are interesting for the end-user, and the names for the callback functions. See the Tortoise.Driver.Logger module for an implementation of a driver.

Subject for discussion:

  • What events are interesting?
  • What should the callbacks be named?

Socket error on client, disconnecting

Im trying to connect to mosquitto broker with MQTT v3.1

On mosquitto logs it appears as:

1549733781: New client connected from 127.0.0.1 as sunflowex (c1, k60).
1549733781: Socket error on client sunflowex, disconnecting.

And on console the log says:

18:39:26.106 [info]  Application sunflowex exited: an exception was raised:                                                                                                                                        
    ** (CaseClauseError) no case clause matching: {:error, :badarg}                                                                                                                                                
        (tortoise) lib/tortoise/connection/receiver.ex:37: Tortoise.Connection.Receiver.handle_socket/2                                                                                                            
        (tortoise) lib/tortoise/connection.ex:623: Tortoise.Connection.init_connection/2                                                                                                                           
        (tortoise) lib/tortoise/connection.ex:386: Tortoise.Connection.handle_info/2                                                                                                                               
        (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4                                                                                                                                                    
        (stdlib) gen_server.erl:686: :gen_server.handle_msg/6                                                                                                                                                      
        (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

The code:

Tortoise.Supervisor.start_child(
      client_id: "sunflowex",
      handler: {Sunflowex.MqttClient, []},
      server: {Tortoise.Transport.Tcp, host: "127.0.0.1", port: 1883},
      subscriptions: [{"engine/#", 1}])

I don't know what can be happening because it was working a week ago 🤔

(CaseClauseError) no case clause matching: {:error, :badarg}

Hi there,

I'm trying to get Tortoise to run for publishing to a MQTT server.
If I simply try in iex:
{ok, _pid} = Tortoise.Connection.start_link( client_id: "HelloWorld", server: {Tortoise.Transport.Tcp, host: "localhost", port: 1883}, handler: {Tortoise.Handler.Logger, []})

I'm getting:


19:20:02.017 [info]  Initializing handler
 
19:20:02.018 [error] GenStateMachine {Tortoise.Registry, {Tortoise.Connection.Inflight, "HelloWorld"}} terminating
** (ErlangError) Erlang error: {:bad_action_from_state_function, {:next_events, :internal, :post_init}}
    (stdlib) gen_statem.erl:1299: :gen_statem.parse_actions_next_event/7
    (stdlib) gen_statem.erl:1194: :gen_statem.loop_event_actions_list/10
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Callback mode: :handle_event_function
Last event: {:internal, :init_state}
State: {:disconnected, %Tortoise.Connection.Inflight{client_id: "HelloWorld", order: [], pending: %{}}}
 
19:20:02.019 [info]  Initializing handler
** (EXIT from #PID<0.367.0>) shell process exited with reason: an exception was raised:
    ** (CaseClauseError) no case clause matching: {:error, :badarg}
        (tortoise) lib/tortoise/connection/receiver.ex:37: Tortoise.Connection.Receiver.handle_socket/2
        (tortoise) lib/tortoise/connection.ex:623: Tortoise.Connection.init_connection/2
        (tortoise) lib/tortoise/connection.ex:386: Tortoise.Connection.handle_info/2
        (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
        (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
        (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
         
Interactive Elixir (1.6.4) - press Ctrl+C to exit (type h() ENTER for help)
iex(15)> 
19:20:02.020 [error] GenServer {Tortoise.Registry, {Tortoise.Connection, "HelloWorld"}} terminating
** (CaseClauseError) no case clause matching: {:error, :badarg}
    (tortoise) lib/tortoise/connection/receiver.ex:37: Tortoise.Connection.Receiver.handle_socket/2
    (tortoise) lib/tortoise/connection.ex:623: Tortoise.Connection.init_connection/2
    (tortoise) lib/tortoise/connection.ex:386: Tortoise.Connection.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: nil}, client_id: "HelloWorld", connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: "HelloWorld", keep_alive: 60, password: nil, protocol: "MQTT", protocol_version: 4, user_name: nil, will: nil}, keep_alive: nil, opts: [client_id: "HelloWorld", handler: {Tortoise.Handler.Logger, []}], server: %Tortoise.Transport{host: 'localhost', opts: [:binary, {:packet, :raw}, {:active, false}], port: 1883, type: Tortoise.Transport.Tcp}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: []}}
 
19:20:02.021 [error] GenStateMachine {Tortoise.Registry, {Tortoise.Connection.Inflight, "HelloWorld"}} terminating
** (ErlangError) Erlang error: {:bad_action_from_state_function, {:next_events, :internal, :post_init}}
    (stdlib) gen_statem.erl:1299: :gen_statem.parse_actions_next_event/7
    (stdlib) gen_statem.erl:1194: :gen_statem.loop_event_actions_list/10
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Callback mode: :handle_event_function
Last event: {:internal, :init_state}
State: {:disconnected, %Tortoise.Connection.Inflight{client_id: "HelloWorld", order: [], pending: %{}}}

Does anyone have an idea what's wrong here?

Make a `ping_sync/2` function on the controller and call that from `Tortoise.Connection`

The Connection process will call a function called ping/1 on the Tortoise.Connection.Controller-module. This is done to ensure the client adhere to the "keep alive" which is an interval configured in the connect message; the connection process will enter a selective receive listening for the reply from the ping; this is done so we can terminate the process if the ping response falls for a timeout (the specification defines a "reasonable timeout")

To clean things up a bit I propose making a public function on Tortoise.Connection.Controller called ping_sync/2; the first argument is the client_id, the second one is a timeout, which could have a default of :infinity. The function should just call the ping/1 function and enter the selective receive with an after clause that return {:error, :timeout}. The Tortoise.Connection module should call this function instead.

Weird output from the test suite (not critical)

The following message is sometimes seen in the test output. It does not cause the test suite to fail, but it might still be a concern that should be addressed.

21:49:48.194 [error] ** State machine {Registry.Tortoise, {Tortoise.Connection.Receiver, "test successful connect reconnect with present state"}} terminating
** Last event = {:internal, :activate_socket}
** When server state  = {{:connected, :receiving_fixed_header}, %Tortoise.Connection.Receiver{buffer: "", client_id: "test successful connect reconnect with present state", socket: #Port<0.8013>}}
** Reason for termination = :error:{:badmatch, {:error, :einval}}
** Callback mode = :handle_event_function
** Queued = [internal: :consume_buffer]
** Stacktrace =
**  [{Tortoise.Connection.Receiver, :handle_event, 4, [file: 'lib/tortoise/connection/receiver.ex', line: 77]}, {:gen_statem, :call_state_function, 5, [file: 'gen_statem.erl', line: 1635]}, {:gen_statem, :loop_event_state_function, 6, [file: 'gen_statem.erl', line: 1023]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]

(Basically I just leave it here so I have it jotted down so I can focus on it at a later stage)

The test "test passive connection get connection" fails once in a while

  1) test passive connection get connection (Tortoise.EventsTest)
     test/tortoise/events_test.exs:34
     match (=) failed
     code:  assert [:connection] = Registry.keys(Tortoise.Events, child)
     right: []
     stacktrace:
       test/tortoise/events_test.exs:48: (test)

Also:

  1) test active connection get connection (Tortoise.EventsTest)
     test/tortoise/events_test.exs:64
     match (=) failed
     code:  assert [:connection] = Registry.keys(Tortoise.Events, child)
     right: []
     stacktrace:
       test/tortoise/events_test.exs:87: (test)

Create a `Tortoise.Application`-module

Create a Tortoise.Application-module and move the application behaviour related stuff from the Tortoise-module into it. Remember to change the application start in the mix.exs file as well!

This will make it easier to document the project as the user can look for the (upcoming) documentation about starting and stopping of the application there.

Retransmit inflight messages when reconnecting

Retransmit inflight messages in cases where we are thrown off the server and reconnect with a present session; most of the logic should be there, as the inflight tracking data-structure has support for rolling back to the previous state; when the connection is reset it should rollback the current inflight messages and retry them from that point on the new connection.

Implement `gen_statem`-style "next" actions for the user defined callback module

As such the user defined "handler" callback module should do as little as possible, and shouldn't block as it is running in the same process as the controller (blocking it will make the client unable to process protocol messages)—but for some cases it would make sense to be able to tell the handler to perform some actions, such as {:subscribe, topic}, {:unsubscribe, topic}, :disconnect, etc in a gen_statem-style next actions list given in the response tuple.

Example:

  def subscription(:down, "baz/foo", state) do
    next_actions = [{:subscribe, "foo/bar"}]
    {:ok, state, next_actions}
  end

The controller should be able to react to these messages and tell the connection to perform the subscriptions, unsubscriptions, or disconnection.

Publish to Hex.pm

This has to wait for a bit. The project is usable, but sort of half baked, and some interfaces will change a bit before we can put it on Hex.

Todo:

  • settle on the user facing interface
  • make it more stable
  • write documentation

`crash_reason : options: key` when trying to connect with SSL

We're trying to connect to a broker with SSL.
We're basically doing:

 transport = {
       Tortoise.Transport.SSL,
       host: ".....",
       port: 8883,
       cacertfile: ["path/to/cacert.pem"],
       keyfile: "/path/to/key.pem",
       certfile: "/path/to/cert.pem",
       verify: :verify_none
     }

{:ok, pid} = Tortoise.Connection.start_link(
      client_id: "....",
      server: transport
      user_name: "....",
      password: "....,
      handler: {OurApp.Handler, :ok}
    )

When we do things with path to files, we're able to connect.

However, when we try to do tha same thing with strings that represent the base64 version of the DER-encoded files for cacerts, key, and path, we get an error:

 transport = {
       Tortoise.Transport.SSL,
       host: ".....",
       port: 8883,
       cacert: [@cacert_der_base64 |> Base.decode64!()],
       keyfile: @key_der_base64 |> Base.decode64!(),
       certfile:@cert_der_base64 |> Base.decode64!(),
       verify: :verify_none
     }

{:ok, pid} = Tortoise.Connection.start_link(
      client_id: "....",
      server: transport
      user_name: "....",
      password: "....,
      handler: {OurApp.Handler, :ok}
    )

Then the connection fails, and the Tortoise.Connection server crash with this error:

ex(1)> [20:36:11.957] [error] <GenServer {Tortoise.Registry, {Tortoise.Connection, OurApp.Handler}} terminating
** (stop) {:options, {:key, <<48, 130, ...>>}}
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: 100}, client_id: OurApp.Handler, connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: OurApp.Handler, keep_alive: 60, password: "...", protocol: "MQTT", protocol_version: 4, user_name: "...", will: nil}, keep_alive: nil, opts: [client_id: "...", handler: {OurAppHandler, :ok}], server: %Tortoise.Transport{host: '...', opts: [:binary, {:packet, :raw}, {:active, false}, {:key, <<48, 130, 4,  ...>>}, {:cert, <<48, 130, 5, , ...>>}, {:cacerts, [<<48, 130, 4, 7,  ...>>]}, {:verify, :verify_peer}], port: 8883, type: Tortoise.Transport.SSL}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: []}}> [
  crash_reason: {{:options,
    {:key,
     <<48, 130, 4,  ...>>}}, []},
  function: "error_info/7",
  module: :gen_server,
  line: 888,
  file: "gen_server.erl",
  pid: #PID<0.284.0>
]

What could this error mean ? Is it possible that we need to convert the binary to something else ?

Implement a disconnect/1 function

I read through the specification, and on of the things missing is a way to shutdown a connection cleanly. This would entail sending a disconnect package to the server.

Because this is the place I go to over-engineer stuff I think about the following.

  1. The disconnect/1 command should sit on Tortoise.Connection and it should take a client_id as an argument
  2. When executed the disconnect will send a :shutdown status on the Tortoise.Events pubsub, which the inflight manager will receive—at which point it should stop accepting packages to track—in other words, the connection should stop accepting subscribe, unsubscribe and publish messages with QoS>0. Notice that QoS=0 packages will still be handled at this point, this is okay as there are no delivery guarantees for them anyways
  3. The inflight manager will now complete the inflight messages (in reality there would be very few, but for some extremely slow connections this might make sense…). When the pending inflight messages has been handled it should somehow communicate to the rest of the system that we are ready for shutdown
  4. We should send the disconnect command
  5. The connection process should shut itself and its children down

I should consider sending errors such as {:error, :connection_terminating} messages to processes waiting response on inflight messages if they are not completed in a reasonable time. Also, processes that tries to put messages on the wire after the termination has started could get the same error.

Implement incremental backoff when trying to reconnect to broker

If the broker goes down and we drop the connection tortoise will attempt to reconnect. If the reconnect fail it will give up. It should continue to attempt getting a connection. I suggest an incremental backoff which can be configured per connection.

The operator should be able to reconnect, so the Connection.renew/1 function should get documented—also, it should be possible to change the connection specification so we can log on to a node running tortoise and update the ip it uses for its broker.

Implement `Pipe.await`

Pipe's was introduced as a data structure that allow us to pour data into the network socket so we don't need to copy it to the Transmitter process. This was intended for situations where massive amounts of QoS0 packages needed to be send, and it fit well because there are no state to keep track off—qos0 is fire and forget, thus no need for keeping track of it in the inflight process…

This create some confusion though, because now we have two ways of publishing a message. We could extend the pipe to include "pending"; a list that keep the references created by publishing a QoS1 or QoS2 message. When the pipe is passed to the Pipe.await/2 it would block the process with a selective receive until all the messages in "pending" has been received—it would still need to go through the inflight process (because this is where we store the message until it is delivered, and it is the sole authority on creating identifiers for a given session), but it would normalise the interface and make it possible to setup a dispatch of multiple messages with a QoS and later wait for them to be resolved.

{:ok, pipe} = Tortoise.Connection.Transmitter.subscribe_await("my_client_id");
pipe = Tortoise.publish(pipe, "foo/bar", "hello", qos: 2)
pipe = Tortoise.publish(pipe, "foo/bar", "hello", qos: 1)
pipe = Tortoise.publish(pipe, "foo/bar", "hello", qos: 0)
pipe = Tortoise.publish(pipe, "foo/bar", "hello", qos: 2)
Tortoise.await(pipe) # block until all messages in the pipe has been resolved

Interface might change before this gets implemented! This is just a ramble of an idea.

Add documentation

The project needs usage documentation. The following is a list of sections and stuff that needs to be documented.

  • Add ex_doc to the project
  • The Tortoise module
  • The Tortoise.App module
  • The Tortoise.Pipe module; passive pipes, active pipes, failure modes and configuration
  • A page about quality of service and how it maps to Tortoise
  • Connection to the broker
  • Connection supervision

Consider what to do when subscriptions are rejected or otherwise fail

When subscribing to a filter topic the broker can choose to accept the subscription but with a different quality of service than the one requested. Right now the subscription will return a map as a result with the keys:

  • :ok, for the topics that got accepted with the requested QoS,
  • :warn for the topics that got accepted with a different QoS than requested, and
  • :error for the topics that returned a access denied error

We should think about what should happen in cases where a subscription result in a warning, or an error. Should we crash? After all, the expectations was not met. I suggest that we could make this a config option.

The user defined callback module has a callback called subscription which will get a :up message when a subscription is accepted, and a :down when it is unsubscribed. This could get an extra, such as :denied; this would require the user to implement extra stuff, which wouldn't be that bad because the user should think about what to do in this situation.

This needs some R&D.

Logger handler does not handle connection :terminated

I see the logger class handles :terminating but not :terminated.

https://github.com/gausby/tortoise/blob/master/lib/tortoise/handler/logger.ex#L26

Actually might help if these states were documented better. The example in README.md only lists :up and :down as valid options. Presumably :terminating happens first, followed by :terminated, followed by :down? Not really sure.

I noticed my code generating the following error, which is why I chose to investigate:

** (FunctionClauseError) no function clause matching in Robotica.Client.connection/2 
    (robotica) lib/robotica/client.ex:16: Robotica.Client.connection(:terminated, %Robotica.Client.State{})

Regards

Implement the MQTT 5 protocol

Currently the client support MQTT 3.1.1

Look into what has changed from version 3.1.1 to version 5 and start implementing the new stuff.

Coerce the input given to the transports to charlists

The transports expect charlists as input for some keys; I think it would improve usability of the transport if it was possible to pass in binaries/stings, and let the init coerce the input to charlists where charlists are expected.

New Websocket transport

Hey 👋

First of all thank you for this amazing library, it is really well written and organised, and you can learn a lot just by scrolling around.

I'm interested in having a MQTT library with support for Websockets, a colleague and I already did some work on Hulaaki to try to achieve it, but it felt a bit hackish. So I looked into this library to see if it would be better suited for it. The Tortoise.Transport behaviour is heavily attached to gen_tcp module API, so I'm developing a library that wraps a Websocket client (gun) and provides an API similar to gen_tcp. The problem now is on testing, since the Tortoise.Transport behaviour requires some functions that are essentially used for testing and not for the normal function of the client. Basically I would have to implement a Websocket server (or use a third party library) to listen and accept connections for testing.

I would like to know if you would be interested in having support for Websocket transport in this library, and if so if you have any insights on how the support for Websockets could be achieved (refactoring the Tortoise.Transport behaviour an option?)?

EXIT no process on nerves

I am using {tortoise: "~> 0.9"} on a nerves project when I boot the device in shell and try simple connection over tcp by

defmodule MMod.Mqtt do
  require Logger
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, :ok,opts)
  end

  def init(:ok) do
    IO.puts("starting mqtt")
    {:ok,pid}=Tortoise.Supervisor.start_child(
        client_id: Application.get_env(:hello_network,:client_id),
        handler: {Tortoise.Handler.Logger, []},
        server: {Tortoise.Transport.Tcp, host: Application.get_env(:hello_network,:mqtt_host), port: String.to_integer(Application.get_env(:hello_network,:port))}
    )
    {:ok,pid}
  end

  def handle_call({:simple_call}, from, state) do
        {:reply,from,state}
  end

  def handle_cast({:simple_cast}, from, state) do
        {:noreply,[]}
  end



  def handle_info(msg, state) do
        case msg do
            _ ->  Tortoise.publish(Application.get_env(:hello_network,:client_id),Application.get_env(:hello_network,:topic),":mymsg", qos: 2)
        {:noreply,state}
               end
  end


end

My values for getting fetched as

client_id : "my_client"
mqtt_host: "iot.eclipse.org"
port: 1883

I am getting this

{:error,
{:noproc,
{GenServer, :call,
[
Tortoise.Supervisor,
{:start_child,
{{ Tortoise.Connection, :start_link,
[
[
client_id: "my_first_client",
handler: {Tortoise.Handler.Logger,[]},
server: {Tortoise.Transport.Tcp,[ host: "iot.eclipse.org", port:1883]}
]
]}, transient, 5000, :worker,[Tortoise.Connection] }},
:infinity
]}}}
** (EXIT from #PID<0.964.0> shell process exited with reason: exited in: GenServer.call(Tortoise.Supervisor,{:start_child, {Tortoise.Connection, :start_link, [[client_id: "my_client_id", handler: {Tortoise.Handler.Logger, []},server: {Tortoise.Transport.Tcp, [host: "iot.eclipse.org", port: 1883]}]]}, :transient, 5000, :worker, [Tortoise.Connection]}}, :infinity)

** (EXIT) no process the process is not alive or there is no process currently associated with given name possibly because its application isn't started
(elixir) lib/gen_server.ex 979 GenServer.call/3

I am able to connect to server with same in normal mix project on host machine. I am using

Erlang OTP/ 21 and Elixir (1.8.1)

Pipe failure modes

We should make it possible to specify a timeout on a transmission Pipe. If the socket on the pipe is down when we try to publish something on it it should go into a selective-receive, awaiting a new pipe from the transmitter—this might already be in the process mailbox, but if it is not we should wait the specified timeout and then do something alternative.

That alternative could be to return {:error, :closed} to the caller, but I suggest a pipe can be configured with specified failure modes when the timeout is triggered:

  • :crash crash the owner process
  • :drop drop the package, this can be used if the process produces a lot of messages and they are not that important (say a sensor that emit every second), this will allow us to have a short timeout, and otherwise rely on the process receiving the new pipe in its mailbox eventually.
  • :error send a regular {:error, :closed}-tuple to the caller letting the user define what should happen now that the timeout has been reached.

Switch from TravisCI to something else, like CircleCI

TravisCI seem to have a 50% chance of producing a false negative when running the unit/integration tests. It has become a major annoyance having to click the «restart build» button every time the project build status is failing.

Convert the property based tests to stream_data

While quickcheck mini is good, and has served us really well so far it would be nice to experiment with the Stream Data module that will be part of the Elixir standard library in the future. That would also remove one dependency from the project.

Possible race condition when passively awaiting a connection

When requesting a socket passively there is a short timeslot where the connection can get dispatched using the Tortoise.Events-module and the connection subscription is made. The flow is:

  1. The process ask the connection for a connection—if it got one: use that; if not: goto 2
  2. it register itself for a :connection with the Tortoise.Events pubsub registry
  3. Await the :connection message, and use the socket when it arrives

If the client make a connection in the short window between 1 and 2 the process will have to wait until a reconnect is made; so it will most likely timeout.

A solution would be to:

  1. it register itself for a :connection with the Tortoise.Events pubsub registry
  2. The process ask the connection for a connection—if it got one: use that and unregister the `:connection from the pubsub; if not: goto 3
  3. Await the :connection message, and use the socket when it arrives
  4. unregister the :connection message for the client_id from the pubsub registry

This would require us to make more changes to the pubsub registry, but I don't think it would be that bad in a "real world scenario"; not that many processes will ask for connections like this.

Update subscription list in the Connection-process when subscribing (and unsubscribing)

When a topic is subscribed to, or unsubscribed from, using the Tortoise.subscribe/2 and Tortoise.unsubscribe/2 the changes should get reflected in the Connection-process state, so it can resubscribe to the correct list if the connection is reestablished with a clean session.

To implement this we need to implement public interfaces for adding and removing subscriptions from the Tortoise.Connection GenServer, and these should alter the %Tortoise.Package.Subscriptions{}-struct stored in subscriptions in the Connection process state—These should get called when the Tortoise.subscribe/2 and Tortoise.unsubscribe/2 is called by the user.

Use a queue instead of a list to store the requested ping messages in the Controller

Tortoise.Connection.Controller will keep track of the process pids that instantiated the ping when performing a ping to the server. This is done so we can report how long the round trip took to the process that requested the ping. This is stored in a list, because if two processes (for what-ever reason) request a ping at the same time two pings would get sent to the server; there is a bug in this, because the list work like a stack the last requester will get the response of the first ping. The list should be replaced with a :queue.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.