Coder Social home page Coder Social logo

hbmqtt's People

Contributors

akatrevorjay avatar andvikt avatar bachp avatar burnpanck avatar cfchou avatar clach04 avatar claws avatar d21d3q avatar dansheme avatar erics465 avatar florianludwig avatar gdraynz avatar gitter-badger avatar hexdump42 avatar hongquan avatar jcea avatar jodal avatar luchermans avatar mi3z avatar mitchbte avatar njouanin avatar ntoonio avatar projectgus avatar romancardenas avatar shipmints avatar spacetag avatar thiswiederkehr avatar vlcinsky avatar zhukovalexander avatar zyp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hbmqtt's Issues

Exceptions on each message received

[2016-02-04 15:43:13,171] :: ERROR - Task exception was never retrieved
future: <Task finished coro=<stream_connected() done, defined at /.../hbmqtt/broker.py:318> exception=ConnectionResetError('Connection lost',)>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/.../hbmqtt/broker.py", line 320, in stream_connected
yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
File "/.../hbmqtt/broker.py", line 427, in client_connected
yield from writer.close()
File "/.../hbmqtt/adapters.py", line 168, in close
yield from self._writer.drain()
File "/usr/lib/python3.5/asyncio/streams.py", line 313, in drain
yield from self._protocol._drain_helper()
File "/usr/lib/python3.5/asyncio/streams.py", line 194, in _drain_helper
raise ConnectionResetError('Connection lost')
ConnectionResetError: Connection lost

And after closing the server:

Exception ignored in: <generator object BrokerProtocolHandler.get_next_pending_subscription at 0x7f3aef258780>
Traceback (most recent call last):
File "/.../hbmqtt/mqtt/protocol/broker_handler.py", line 89, in get_next_pending_subscription
File "/usr/lib/python3.5/asyncio/queues.py", line 170, in get
File "/usr/lib/python3.5/asyncio/futures.py", line 227, in cancel
File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
File "/usr/lib/python3.5/asyncio/base_events.py", line 447, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed

hbmqtt does not fetch exception when zero length payload is sent

[2016-04-05 08:46:57,759] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connecionts=-1)
[2016-04-05 08:47:07,140] :: INFO - Listener 'default': 1 connections acquired
[2016-04-05 08:47:07,140] :: INFO - Connection from 127.0.0.1:60542 on listener 'default'
[2016-04-05 08:47:08,131] :: ERROR - Task exception was never retrieved
future: <Task finished coro=<stream_connected() done, defined at /usr/local/lib/python3.5/site-packages/hbmqtt/broker.py:319> exception=KeyError('MQTTClient',)>
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(value)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 321, in stream_connected
    yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 466, in client_connected
    self.retain_message(client_session, app_message.topic, app_message.data, app_message.qos)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 543, in retain_message
    del self._retained_messages[topic_name]
KeyError: 'MQTTClient'
^C[20

hbmqtt does not reject zero clientId with cleansession = 0

[MQTT-3.1.3-8]
If the Client supplies a zero-byte ClientId with CleanSession set to 0, the Server MUST respond to the CONNECT Packet with a CONNACK return code 0x02 (Identifier rejected) and then close the Network Connection.

[MQTT-3.1.3-9]
If the Server rejects the ClientId it MUST respond to the CONNECT Packet with a CONNACK return code 0x02 (Identifier rejected) and then close the Network Connection.

hbmqtt does not react correctly to illegal protocol levels

[MQTT-3.1.2-2]
The Server MUST respond to the CONNECT Packet with a CONNACK return code 0x01 (unacceptable protocol level) and then disconnect the Client if the Protocol Level is not supported by the Server.

[MQTT-3.2.2-5]
If a server sends a CONNACK packet containing a non-zero return code it MUST then close the Network Connection.

[2016-04-05 08:40:20,721] :: INFO - Connection from 127.0.0.1:60442 on listener 'default'
[2016-04-05 08:40:20,723] :: ERROR - Task exception was never retrieved
future: <Task finished coro=<stream_connected() done, defined at /usr/local/lib/python3.5/site-packages/hbmqtt/broker.py:319> exception=AttributeError("'ConnectVariableHeader' object has no attribute 'protocol_level'",)>
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(value)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 321, in stream_connected
    yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 336, in client_connected
    handler, client_session = yield from BrokerProtocolHandler.init_from_connect(reader, writer, self.plugins_manager, loop=self._loop)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/broker_handler.py", line 147, in init_from_connect
    connect.variable_header.protocol_level)
AttributeError: 'ConnectVariableHeader' object has no attribute 'protocol_level'

Memory leak using websocket protocol

I encountered a memory leak using the websocket protocol ws:// in the WebSocketsReader.read() method because the method self._protocol.recv() is blocking until a websocket message is received.
I believe this websockets library should be used with a single recv loop, otherwise you might end up stacking coroutines blocking at recv.
I was not able to fix it right now but the problem is definitely there. Any chance you could look into it ?

non wheel packages

pypi only contains .whl packages making non-wheel installations impossible. Please consider uploading source distributions as well. thanks

error message from tests

Running hbmqtt tests there are a lot of error messages, for example:

pytest -s test_client.py::MQTTClientTest::test_ping
============================================================================================ test session starts ============================================================================================
platform linux -- Python 3.4.3, pytest-3.0.2, py-1.4.31, pluggy-0

========================================================================================= 1 passed in 0.11 seconds ==========================================================================================
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<stream_connected() running at /home/adona/dev/hbmqtt/hbmqtt/broker.py:336> wait_for=>
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<wait_disconnect() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py:44> wait_for= cb=[_wait.._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<get_next_pending_subscription() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py:89> wait_for= cb=[_wait.._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<get_next_pending_unsubscription() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py:94> wait_for= cb=[_wait.._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<mqtt_deliver_next_message() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/handler.py:462> wait_for= cb=[_wait.._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
Exception ignored in: [2016-09-07 10:14:43,965] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<_reader_loop() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/handler.py:373> wait_for=>
[2016-09-07 10:14:43,965] hbmqtt.mqtt.protocol.handler {handler.py:432} WARNING - BrokerProtocolHandler Unhandled exception in reader coro: GeneratorExit()
[2016-09-07 10:14:43,965] hbmqtt.mqtt.protocol.handler {broker_handler.py:55} DEBUG - Client disconnecting
[2016-09-07 10:14:43,965] hbmqtt.mqtt.protocol.handler {broker_handler.py:57} DEBUG - Setting waiter result to None
Exception ignored in: <generator object _reader_loop at 0x7fd424ec1288>
Traceback (most recent call last):
File "/home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/handler.py", line 436, in _reader_loop
yield from self.handle_connection_closed()
File "/home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py", line 62, in handle_connection_closed
yield from self.handle_disconnect(None)
File "/usr/lib/python3.4/asyncio/coroutines.py", line 141, in coro
res = func(_args, *_kw)
File "/home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py", line 58, in handle_disconnect
self._disconnect_waiter.set_result(disconnect)
File "/usr/lib/python3.4/asyncio/futures.py", line 339, in set_result
self._schedule_callbacks()
File "/usr/lib/python3.4/asyncio/futures.py", line 243, in _schedule_callbacks
self._loop.call_soon(callback, self)
File "/usr/lib/python3.4/asyncio/base_events.py", line 427, in call_soon
handle = self._call_soon(callback, args)
File "/usr/lib/python3.4/asyncio/base_events.py", line 436, in _call_soon
self._check_closed()
File "/usr/lib/python3.4/asyncio/base_events.py", line 265, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

How to fix that?
greetings
Attilio

Server crash under heavy load

Hi Folks,

I am evaluating hbmqtt for the kind of video streaming scenario where I am publishing mjpeg (sequence of jpegs) with 30-60 FPS rate. For test purposes I have created the following publisher using paho client: https://github.com/veter-team/mechspeak/blob/master/src/publish2.py . It publishes all jpeg files found in the directory specified by the frames_dir variable at the beginning of the file. Subscribers are simple web-applications also written with paho js client. They receive jpegs and displays it on the canvas. Relevant sources are:
https://github.com/veter-team/mechspeak/blob/master/www/videoobserver.html
https://github.com/veter-team/mechspeak/blob/master/www/js/utility.js

I was running the following configurations:

  1. hbmqtt and publish2.py running on raspberry pi 2 connected to the network over Ethernet. Three web-subscribers (browser windows) running on other machine in the same local network.
  2. hbmqtt and publish2.py running in Ubuntu 16.04 LTE virtual machine (the VM itself is running on Windows). Three web-subscribers running on the same machine but outside of the VM (i.e. natively on Windows).

Publishing rate can be adjusted by editing the sleep interval here:
https://github.com/veter-team/mechspeak/blob/master/src/publish2.py#L38

Running the test on RPi with approx. 30 FPS or in the VM with approx. 60FPS will result in hbmqtt server crash after about 10 seconds of operation. Please see attached file for my configuration file and logs generated by hbmqtt.

crash_info.txt

Single-level wildcards match more than one level

This issue is related to issue #41.
There is another problem, which existed before and which persists with the fix of issue #41. The single-level wildcards denoted by + should not match more than one level, but the replacement-regex allows matching strings as well as level-separators /, so I guess replace('+', '[/\$\s\w\d]+') should be changed to something like replace('+', '[^/\$\s\w\d]+'), i.e. it should only match characters unequal to a /.
I should note that I did not evaluate this fix thoroughly either. I just checked with some string and did not test special cases like zero length topic names.

However, by doing so, issue #41 would still persist I think if wildcards are used. Since the regex would partially match, by subscribing to a/+/b, one would also receive messages published for a/something/b/anotherthing.

Implement persistence in broker

Currently, all sessions data managed by a broker are stored in memory. This is a limit for serious use.
HBMQTT broker should be able to persist sessions data (while offline).
Implementation should be based on the plugin system which would allow to use different persistence engine.

Crash broker if PUBLISH with dup=1 repeated

This error is reported based on running an MQTTv311 compliance test against the hbmqtt:

[MQTT-3.3.1-1]
The DUP flag MUST be set to 1 by the Client or Server when it attempts to re- deliver a PUBLISH Packet.

[2016-04-05 06:44:39,596] :: INFO - Listener 'default': 1 connections acquired
[2016-04-05 06:44:40,605] :: INFO - Listener 'default': 2 connections acquired
[2016-04-05 06:44:40,606] :: INFO - Connection from 127.0.0.1:58478 on listener 'default'
[2016-04-05 06:44:40,607] :: WARNING - Delete session : session MQTTClient601540 doesn't exist
[2016-04-05 06:44:42,612] :: INFO - Listener 'default': 1 connections acquired
[2016-04-05 06:44:43,619] :: INFO - Listener 'default': 2 connections acquired
[2016-04-05 06:44:43,619] :: INFO - Connection from 127.0.0.1:58479 on listener 'default'
[2016-04-05 06:44:43,621] :: WARNING - Delete session : session MQTTClient615657 doesn't exist
[2016-04-05 06:45:04,629] :: WARNING - Can't add PUBREL waiter, a waiter already exists for message Id '2'
Traceback (most recent call last):
  File "/usr/local/bin/hbmqtt", line 11, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.5/site-packages/scripts/broker_script.py", line 73, in main
    loop.run_forever()
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 301, in run_forever
    self._run_once()
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 1198, in _run_once
    handle._run()
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/events.py", line 125, in _run
    self._callback(*self._args)
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(value)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 559, in handle_publish
    yield from self._handle_message_flow(incoming_message)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 203, in _handle_message_flow
    yield from self._handle_qos2_message_flow(app_message)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 339, in _handle_qos2_message_flow
    raise HBMQTTException(message)
hbmqtt.errors.HBMQTTException: Can't add PUBREL waiter, a waiter already exists for message Id '2'
[2016-04-05 06:45:04,690] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<coro() running at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/coroutines.py:198>>
[2016-04-05 06:45:04,690] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<coro() running at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/coroutines.py:198>>
[2016-04-05 06:45:04,700] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<Broker._broadcast_loop() running at /usr/local/lib/python3.5/site-packages/hbmqtt/broker.py:624> wait_for=<Future pending cb=[Task._wakeup()]>>
[2016-04-05 06:45:04,701] :: ERROR - Task exception was never retrieved
future: <Task finished coro=<ProtocolHandler.handle_publish() done, defined at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py:552> exception=HBMQTTException("Can't add PUBREL waiter, a waiter already exists for message Id '2'",)>
Traceback (most recent call last):
  File "/usr/local/bin/hbmqtt", line 11, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.5/site-packages/scripts/broker_script.py", line 73, in main
    loop.run_forever()
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 301, in run_forever
    self._run_once()
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 1198, in _run_once
    handle._run()
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/events.py", line 125, in _run
    self._callback(*self._args)
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(value)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 559, in handle_publish
    yield from self._handle_message_flow(incoming_message)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 203, in _handle_message_flow
    yield from self._handle_qos2_message_flow(app_message)
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 339, in _handle_qos2_message_flow
    raise HBMQTTException(message)
hbmqtt.errors.HBMQTTException: Can't add PUBREL waiter, a waiter already exists for message Id '2'
[2016-04-05 06:45:04,704] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<ProtocolHandler._reader_loop() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py:369> wait_for=<Future pending cb=[Task._wakeup()]>>
[2016-04-05 06:45:04,704] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<MQTTFixedHeader.from_stream() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/packet.py:69> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:336]>
[2016-04-05 06:45:04,704] :: WARNING - BrokerProtocolHandler Unhandled exception in reader coro: GeneratorExit()
Exception ignored in: <generator object ProtocolHandler._reader_loop at 0x1106600a0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 431, in _reader_loop
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 214, in cancel
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 227, in cancel
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 453, in call_soon
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 462, in _call_soon
  File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 289, in _check_closed
RuntimeError: Event loop is closed
[2016-04-05 06:45:04,705] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<ProtocolHandler.handle_publish() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py:559> wait_for=<Future cancelled>>
[2016-04-05 06:45:04,705] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<BrokerProtocolHandler.wait_disconnect() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/broker_handler.py:44> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:400]>
[2016-04-05 06:45:04,706] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<BrokerProtocolHandler.get_next_pending_unsubscription() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/broker_handler.py:94> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:400]>
[2016-04-05 06:45:04,706] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<BrokerProtocolHandler.get_next_pending_subscription() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/broker_handler.py:89> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:400]>
[2016-04-05 06:45:04,706] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<ProtocolHandler.mqtt_deliver_next_message() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py:457> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:400]>
[2016-04-05 06:45:04,707] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<stream_connected() running at /usr/local/lib/python3.5/site-packages/hbmqtt/broker.py:321> wait_for=<Future pending cb=[Task._wakeup()]>>

hbmqtt accepts malformed packets but treats them as NOOP

MQTT brokers and clients must close the network connection in case of protocol violation according to

Unless stated otherwise, if either the Server or Client encounters a protocol violation, it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation [MQTT-4.8.0-1]

However, I have noticed that hbmqtt handles malformed packets or illegally sent packets in some cases by keeping the network connection open but ignoring all other subsequently received packets. In other cases, illegally sent packets are ignored but normal operation is continued.

Here are a few example with corresponding specification statements:

  • Subscriptions with QoS = 3 are acknowledged but treated as NOOP, i.e. a connection still persists afterwards, but messages published to the corresponding topic are not forwarded to subscribers with QoS 3. This violates:

The Server MUST treat a SUBSCRIBE packet as malformed and close the Network Connection if any of Reserved bits in the payload are non-zero, or QoS is not 0,1 or 2 [MQTT-3-8.3-4].

  • The same holds for publishing with QoS = 3 which violates:

A PUBLISH Packet MUST NOT have both QoS bits set to 1. If a Server or Client receives a PUBLISH Packet which has both QoS bits set to 1 it MUST close the Network Connection [MQTT-3.3.1-4].

  • If a packet other than CONNECT is sent as first packet, hbmqtt enters a state where all packets are ignored, but the network connection is kept open. This violates:

[MQTT-3.1.0-1] After a Network Connection is established by a Client to a Server, the first Packet
sent from the Client to the Server MUST be a CONNECT Packet.

and [MQTT-4.8.0-1] cited above.

  • If a CONNECT packet is sent while being connected, it is ignored and normal operation is continued which is a violation of:

A Client can only send the CONNECT Packet once over a Network Connection. The Server MUST
process a second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client [MQTT-3.1.0-2].

  • Another error that is debatable is the following:
    hbmqtt treats PUBLISH packets with the DUP-flag set to true and QoS = 0 as NOOP as well, i.e. neither the published message is forwarded to subscribed clients, nor is the network connection closed. Strictly speaking, this violates

The DUP flag MUST be set to 0 for all QoS 0 messages [MQTT-3.3.1-2]

and [MQTT-4.8.0-1] cited above.

Hence, the network connection should probably be closed. However, I have looked at four other implementations which all simply ignore the DUP-flag and forward the message. This is probably better suited to the needs of users although it somehow violates the specification.

Constant session doesn't exist warnings

For each transmission of my mqtt client I see this message (with different IDs) in my logs:
hbmqtt.broker: Delete session : session paho/AD96ADC1E3EF4820F0 doesn't exist
For the client I use paho.mqtt.publish.single. The transmission of data is working fine.

Can I simply ignore the message? Do I need to look into the code of broker? Do I need to change parameters on the client?

How can i load external plugin?

Hi,
I would like to run an authentication plugin. I read in the documentation that has the entry_points in setup.py but is there a simpler way?

hbmqtt accepts UTF8 topic strings with illegal characters in PUBLISH

[MQTT-1.5.3-1]
The character data in a UTF-8 encoded string MUST be well-formed UTF-8 as defined by the Unicode specification [Unicode] and restated in RFC 3629 [RFC3629]. In particular this data MUST NOT include encodings of code points between U+D800 and U+DFFF. If a Server or Client receives a Control Packet containing ill-formed UTF-8 it MUST close the Network Connection.

[MQTT-1.5.3-2]
A UTF-8 encoded string MUST NOT include an encoding of the null character U+0000. If a receiver (Server or Client) receives a Control Packet containing U+0000 it MUST close the Network Connection.

[MQTT-1.5.3-3]
A UTF-8 encoded sequence 0xEF 0xBB 0xBF is always to be interpreted to mean U+FEFF ("ZERO WIDTH NO-BREAK SPACE") wherever it appears in a string and MUST NOT be skipped over or stripped off by a packet receiver.

The documentation does not include imports.

It's very hard to follow the documentation because the examples don't show where the imports come from. To run an example, I have to look through the docs and see where each name is imported from. Please add imports to the doc examples.

MQTT errors with Home-Assistant

I am trying to add HBMQTT to Home-Assistant. I used the default config as indicated here:

mqtt:
  embedded:

I get the following error:

homeassistant.bootstrap: Invalid config for [mqtt]: expected dict for dictionary value @ data['mqtt']['embedded']. Got None

@balloob suggested adding {} after embedded, so I modified the config to:

mqtt:
  embedded: {}

Now, I get the following errors:

16-09-07 21:33:57 homeassistant.components.mqtt.server: Error initializing MQTT server
Traceback (most recent call last):
  File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 184, in _build_listeners_config
    listeners_config = broker_config['listeners']
KeyError: 'listeners'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py", line 53, in start
    loop.run_until_complete(start_server)
  File "/usr/lib/python3.4/asyncio/base_events.py", line 271, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 244, in run_forever
    self._run_once()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 1081, in _run_once
    handle._run()
  File "/usr/lib/python3.4/asyncio/events.py", line 120, in _run
    self._callback(*self._args)
  File "/usr/lib/python3.4/asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py", line 23, in broker_coro
    broker = Broker(config, loop)
  File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 156, in __init__
    self._build_listeners_config(self.config)
  File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 191, in _build_listeners_config
    raise BrokerException("Listener config not found invalid: %s" % ke)
hbmqtt.broker.BrokerException: Listener config not found invalid: 'listeners'
16-09-07 21:33:57 homeassistant.components.mqtt: Unable to start broker and auto-configure MQTT.
16-09-07 21:33:57 homeassistant.bootstrap: component mqtt failed to initialize
16-09-07 21:33:57 asyncio: Task exception was never retrieved
future: <Task finished coro=<broker_coro() done, defined at /srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py:19> exception=BrokerException("Listener config not found invalid: 'listeners'",)>
Traceback (most recent call last):
  File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 184, in _build_listeners_config
    listeners_config = broker_config['listeners']
KeyError: 'listeners'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py", line 53, in start
    loop.run_until_complete(start_server)
  File "/usr/lib/python3.4/asyncio/base_events.py", line 271, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 244, in run_forever
    self._run_once()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 1081, in _run_once
    handle._run()
  File "/usr/lib/python3.4/asyncio/events.py", line 120, in _run
    self._callback(*self._args)
  File "/usr/lib/python3.4/asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py", line 23, in broker_coro
    broker = Broker(config, loop)
  File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 156, in __init__
    self._build_listeners_config(self.config)
  File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 191, in _build_listeners_config
    raise BrokerException("Listener config not found invalid: %s" % ke)
hbmqtt.broker.BrokerException: Listener config not found invalid: 'listeners'

HBMQTTException and ClientException deriving from BaseException instead of Exception

Hi,

I notice that HBMQTTException and ClientException both are deriving from BaseException. Although the doc says asyncio emits a log when seeing unconsumed exceptions. However, BaseException would also abort the whole event loop while other tasks are running just fine.

Is there a way to solve this problem? Or may I suggest to change them to derive from Exception instead?

missing import in example client_subscribe.py

file: hbmqtt/samples/client_subscribe.py
ClientException is not imported and raises an error if the exception gets thrown.

Fix:
change line 4 to

from hbmqtt.client import MQTTClient, ConnectException, ClientException

Broker disconnects when receiving messages > 256 kB

[2016-04-05 07:09:02,990] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connecionts=-1)
[2016-04-05 07:09:36,077] :: INFO - Listener 'default': 1 connections acquired
[2016-04-05 07:09:36,077] :: INFO - Connection from 127.0.0.1:59001 on listener 'default'
[2016-04-05 07:09:36,079] :: WARNING - Delete session : session MQTTClient70625 doesn't exist
[2016-04-05 07:09:54,155] :: WARNING - Received PUBREL for unknown pending message with Id: 26471
[2016-04-05 07:09:54,158] :: WARNING - Received PUBREL for unknown pending message with Id: 26471
[2016-04-05 07:09:54,160] :: WARNING - Received PUBREL for unknown pending message with Id: 26471
[2016-04-05 07:09:54,160] :: WARNING - Received PUBREL for unknown pending message with Id: 26471
[2016-04-05 07:09:54,161] :: WARNING - Received PUBREL for unknown pending message with Id: 25090
[2016-04-05 07:09:54,162] :: WARNING - MQTTClient70625 Received reserved packet, which is forbidden: closing connection
[2016-04-05 07:09:54,165] :: INFO - Listener 'default': 0 connections acquired

Hierarchical topics not handled correctly

Hierarchical topics are not handled correctly. More concretely, if I subscribe to a topic at a high level in the hierarchy I will also receive messages published at a lower level.

I tested the following simple example:

  1. Connect with Client C1 and receive ConnAck
  2. Subscribe with C1 to topic "my_topic" and receive SubAck
  3. Publish message "my_message" with C1 and QoS 1 to "my_topic/other_topic" and receive PubAck and additionally a Publish-message for "my_topic/other_topic" with message "my_message"

Thus, the subscription to "my_topic" works like a subscription to "my_topic/#". I would interpret this as a violation of the MQTT-standard and I also checked with emqtt and mosquitto, which would not send a Publish in response to the last step.

A quick fix would be to change Line 623 from
match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+'))
to
match_pattern = re.compile("^"+a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+')+"$")

However, I am neither a Python-expert nor did I test the fix thoroughly.

deliver_message hangs using mosquitto on websockets

Using a simple mosquitto broker on the websockets protocol, the method deliver_message seems to hang for a long time when sending big messages (we use hbmqtt to publish JSON payloads). The longer the message is, the longer the method .recv() in the WebsocketReader will hang.
Is it a known issue ?
Changing the protocol to mqtt works like a charm so it does not appear to be a broker problem.

mqtt over websockets from browser

Hello,

I cannot connect a mqtt client (in the browser, using mqtt.js) from the browser to the hbmqtt broker.

The broker prints:

INFO :: websockets.protocol :: Failing the WebSocket connection: 1006

The browser prints:

WebSocket connection to 'ws://127.0.0.1:8888/' failed: Error during WebSocket handshake: Sent non-empty 'Sec-WebSocket-Protocol' header but no response was received

Is this broker incompatible with a browser mqtt over web sockets client?

Thanks in advance,

Vasco Santos

$SYS/broker duplicated in $SYS/broker topics

Ran:

hbmqtt_sub --url mqtt://<broker_address>:<broker_port> -t '$SYS/#' -d

Got back a lot of stuff like:

[2016-05-06 05:47:36,762] :: DEBUG - hbmqtt_sub/<client_id> <-in-- PublishPacket(ts=2016-05-06 05:47:36.761783, fixed=MQTTFixedHeader(length=45, flags=0x0), variable=PublishVariableHeader(topic=$SYS/broker/$SYS/broker/load/bytes/sent, packet_id=None), payload=PublishPayload(data="bytearray(b'1993')"))

When I ran it all through a Regular Expression filter, I got:

[0] => topic=$SYS/broker/$SYS/broker/load/bytes/received,
[1] => topic=$SYS/broker/$SYS/broker/load/bytes/sent,
[2] => topic=$SYS/broker/$SYS/broker/messages/received,
[3] => topic=$SYS/broker/$SYS/broker/messages/sent,
[4] => topic=$SYS/broker/$SYS/broker/time,
[5] => topic=$SYS/broker/$SYS/broker/uptime,
[6] => topic=$SYS/broker/$SYS/broker/uptime/formated,
[7] => topic=$SYS/broker/$SYS/broker/clients/connected,
[8] => topic=$SYS/broker/$SYS/broker/clients/disconnected,
[9] => topic=$SYS/broker/$SYS/broker/clients/maximum,
[10] => topic=$SYS/broker/$SYS/broker/clients/total,
[11] => topic=$SYS/broker/$SYS/broker/messages/inflight,
[12] => topic=$SYS/broker/$SYS/broker/messages/inflight/in,
[13] => topic=$SYS/broker/$SYS/broker/messages/inflight/out,
[14] => topic=$SYS/broker/$SYS/broker/messages/inflight/stored,
[15] => topic=$SYS/broker/$SYS/broker/messages/publish/received,
[16] => topic=$SYS/broker/$SYS/broker/messages/publish/sent,
[17] => topic=$SYS/broker/$SYS/broker/messages/retained/count,
[18] => topic=$SYS/broker/$SYS/broker/messages/subscriptions/count,

i.e. the $SYS/broker is being repeated in the Topic, to become $SYS/broker/$SYS/broker.

I'm assuming this isn't correct.

I've upgraded to 0.7, but the client is reporting 0.6.3 when I connect.

Thanks!
...Andrew

Broker startup failed: an integer is required (got type str)

Python 3.5.1+ (default, Mar 30 2016, 22:46:26)
Ubuntu 16.04 LTS
hbmqtt Installed either with pip3 or from latest github sources.

Attempt to run hbmqtt causes the error below. It seams like casting port to int, i.e. int(port), in broker.py at lines 256 and 263 fixes this issue. However, I did not check if there are any other places where similar problem may occur.

[2016-06-10 14:35:13,583] :: ERROR - Broker startup failed: an integer is required (got type str)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/hbmqtt/broker.py", line 258, in start
loop=self._loop)
File "/usr/lib/python3.5/asyncio/streams.py", line 116, in start_server
return (yield from loop.create_server(factory, host, port, **kwds))
File "/usr/lib/python3.5/asyncio/base_events.py", line 949, in create_server
sock.bind(sa)
TypeError: an integer is required (got type str)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/bin/hbmqtt", line 11, in
sys.exit(main())
File "/usr/local/lib/python3.5/dist-packages/scripts/broker_script.py", line 72, in main
loop.run_until_complete(broker.start())
File "/usr/lib/python3.5/asyncio/base_events.py", line 361, in run_until_complete
self.run_forever()
File "/usr/lib/python3.5/asyncio/base_events.py", line 331, in run_forever
self._run_once()
File "/usr/lib/python3.5/asyncio/base_events.py", line 1298, in _run_once
handle._run()
File "/usr/lib/python3.5/asyncio/events.py", line 125, in _run
self._callback(*self._args)
File "/usr/lib/python3.5/asyncio/tasks.py", line 308, in _wakeup
self._step()
File "/usr/lib/python3.5/asyncio/tasks.py", line 240, in _step
result = coro.send(None)
File "/usr/local/lib/python3.5/dist-packages/hbmqtt/broker.py", line 280, in start
raise BrokerException("Broker instance can't be started: %s" % e)
hbmqtt.broker.BrokerException: Broker instance can't be started: an integer is required (got type str)

get_message timeout

Should the use of the timeout parameter generate the following on a timeout?
Traceback (most recent call last):
...
message = await self.C.deliver_message(timeout=3)
File "py3\lib\site-packages\hbmqtt\client.py", line 337, in deliver_message
if deliver_task.exception():
File "c:\python35\Lib\asyncio\futures.py", line 288, in exception
raise InvalidStateError('Exception is not set.')
asyncio.futures.InvalidStateError: Exception is not set.

hbmqtt matches $SYS/# topics with #

[MQTT-4.7.2-1]
The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic Names beginning with a $ character.

Websockets aren't working

When connecting a client to the mqtt broker over a websocket I get the following error:

:: WARNING :: hbmqtt.broker :: [MQTT-3.1.0-1] (client @=192.168.0.101:53088): Can't read first packet an CONNECT: No more data

Reproduce:
Start hbmqtt broker
Connect e.g. by using this webclient: http://www.hivemq.com/demos/websocket-client/

This might be related to the following modification in websockets:
http://websockets.readthedocs.org/en/stable/changelog.html?highlight=legacy_recv
but even when using legacy websockets package I get the same behavior.

Thanks for publishing this here on github, I very much like this python mqtt broker

configuration file issue: fails to find 'bind' on default

Using basically the default configuration file with IP addresses added, I get an error when I try to use it:

hbmqtt.conf.txt

`[2016-04-26 12:33:33,739] :: ERROR - Broker startup failed: 'bind'
Traceback (most recent call last):
File "/home/acoile/hbmqtt/lib/python3.5/site-packages/hbmqtt/broker.py", line 248, in *start
address, port = listener['bind'].split(':')
KeyError: 'bind'
*

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "bin/hbmqtt", line 11, in
sys.exit(main())
File "/home/acoile/hbmqtt/lib/python3.5/site-packages/scripts/broker_script.py", line 72, in main
loop.run_until_complete(broker.start())
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 325, in run_until_complete
self.run_forever()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 295, in run_forever
self._run_once()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 1254, in _run_once
handle._run()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/events.py", line 125, in _run
self._callback(*self._args)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/home/acoile/hbmqtt/lib/python3.5/site-packages/hbmqtt/broker.py", line 276, in start
raise BrokerException("Broker instance can't be started: %s" % e)
hbmqtt.broker.BrokerException: Broker instance can't be started: 'bind'
[2016-04-26 12:33:33,751] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<coro() running at /Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/coroutines.py:204>>`

It appears to be because there is no 'bind: x.y.z.t:p' entry for listener:default:

Shouldn't it be applying listeners:default: from hbmqtt's default configuration, then listeners:default: from this configuration, and then adding the specific items in each listeners: section, and just processing those sections? In other words, the defaults are items to be integrated into the configuration for each listeners:, not something that should be processed on their own.

Thanks for any help you can provide.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.