njouanin / hbmqtt Goto Github PK
View Code? Open in Web Editor NEWMQTT client/broker using Python asynchronous I/O
License: MIT License
MQTT client/broker using Python asynchronous I/O
License: MIT License
[2016-02-04 15:43:13,171] :: ERROR - Task exception was never retrieved
future: <Task finished coro=<stream_connected() done, defined at /.../hbmqtt/broker.py:318> exception=ConnectionResetError('Connection lost',)>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/.../hbmqtt/broker.py", line 320, in stream_connected
yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
File "/.../hbmqtt/broker.py", line 427, in client_connected
yield from writer.close()
File "/.../hbmqtt/adapters.py", line 168, in close
yield from self._writer.drain()
File "/usr/lib/python3.5/asyncio/streams.py", line 313, in drain
yield from self._protocol._drain_helper()
File "/usr/lib/python3.5/asyncio/streams.py", line 194, in _drain_helper
raise ConnectionResetError('Connection lost')
ConnectionResetError: Connection lost
And after closing the server:
Exception ignored in: <generator object BrokerProtocolHandler.get_next_pending_subscription at 0x7f3aef258780>
Traceback (most recent call last):
File "/.../hbmqtt/mqtt/protocol/broker_handler.py", line 89, in get_next_pending_subscription
File "/usr/lib/python3.5/asyncio/queues.py", line 170, in get
File "/usr/lib/python3.5/asyncio/futures.py", line 227, in cancel
File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
File "/usr/lib/python3.5/asyncio/base_events.py", line 447, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed
[2016-04-05 08:46:57,759] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connecionts=-1)
[2016-04-05 08:47:07,140] :: INFO - Listener 'default': 1 connections acquired
[2016-04-05 08:47:07,140] :: INFO - Connection from 127.0.0.1:60542 on listener 'default'
[2016-04-05 08:47:08,131] :: ERROR - Task exception was never retrieved
future: <Task finished coro=<stream_connected() done, defined at /usr/local/lib/python3.5/site-packages/hbmqtt/broker.py:319> exception=KeyError('MQTTClient',)>
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(value)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 321, in stream_connected
yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 466, in client_connected
self.retain_message(client_session, app_message.topic, app_message.data, app_message.qos)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 543, in retain_message
del self._retained_messages[topic_name]
KeyError: 'MQTTClient'
^C[20
[MQTT-3.1.3-8]
If the Client supplies a zero-byte ClientId with CleanSession set to 0, the Server MUST respond to the CONNECT Packet with a CONNACK return code 0x02 (Identifier rejected) and then close the Network Connection.[MQTT-3.1.3-9]
If the Server rejects the ClientId it MUST respond to the CONNECT Packet with a CONNACK return code 0x02 (Identifier rejected) and then close the Network Connection.
[MQTT-3.1.2-2]
The Server MUST respond to the CONNECT Packet with a CONNACK return code 0x01 (unacceptable protocol level) and then disconnect the Client if the Protocol Level is not supported by the Server.[MQTT-3.2.2-5]
If a server sends a CONNACK packet containing a non-zero return code it MUST then close the Network Connection.
[2016-04-05 08:40:20,721] :: INFO - Connection from 127.0.0.1:60442 on listener 'default'
[2016-04-05 08:40:20,723] :: ERROR - Task exception was never retrieved
future: <Task finished coro=<stream_connected() done, defined at /usr/local/lib/python3.5/site-packages/hbmqtt/broker.py:319> exception=AttributeError("'ConnectVariableHeader' object has no attribute 'protocol_level'",)>
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(value)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 321, in stream_connected
yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
File "/usr/local/lib/python3.5/site-packages/hbmqtt/broker.py", line 336, in client_connected
handler, client_session = yield from BrokerProtocolHandler.init_from_connect(reader, writer, self.plugins_manager, loop=self._loop)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/broker_handler.py", line 147, in init_from_connect
connect.variable_header.protocol_level)
AttributeError: 'ConnectVariableHeader' object has no attribute 'protocol_level'
[MQTT-3.3.2-2]
The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters.
I encountered a memory leak using the websocket protocol ws://
in the WebSocketsReader.read()
method because the method self._protocol.recv()
is blocking until a websocket message is received.
I believe this websockets library should be used with a single recv loop, otherwise you might end up stacking coroutines blocking at recv
.
I was not able to fix it right now but the problem is definitely there. Any chance you could look into it ?
[MQTT-3.3.2-2]
The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters.
pypi only contains .whl packages making non-wheel installations impossible. Please consider uploading source distributions as well. thanks
Running hbmqtt tests there are a lot of error messages, for example:
pytest -s test_client.py::MQTTClientTest::test_ping
============================================================================================ test session starts ============================================================================================
platform linux -- Python 3.4.3, pytest-3.0.2, py-1.4.31, pluggy-0
========================================================================================= 1 passed in 0.11 seconds ==========================================================================================
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<stream_connected() running at /home/adona/dev/hbmqtt/hbmqtt/broker.py:336> wait_for=>
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<wait_disconnect() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py:44> wait_for= cb=[_wait.._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<get_next_pending_subscription() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py:89> wait_for= cb=[_wait.._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<get_next_pending_unsubscription() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py:94> wait_for= cb=[_wait.._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
[2016-09-07 10:14:43,964] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<mqtt_deliver_next_message() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/handler.py:462> wait_for= cb=[_wait.._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
Exception ignored in: [2016-09-07 10:14:43,965] asyncio {base_events.py:1008} ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<_reader_loop() running at /home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/handler.py:373> wait_for=>
[2016-09-07 10:14:43,965] hbmqtt.mqtt.protocol.handler {handler.py:432} WARNING - BrokerProtocolHandler Unhandled exception in reader coro: GeneratorExit()
[2016-09-07 10:14:43,965] hbmqtt.mqtt.protocol.handler {broker_handler.py:55} DEBUG - Client disconnecting
[2016-09-07 10:14:43,965] hbmqtt.mqtt.protocol.handler {broker_handler.py:57} DEBUG - Setting waiter result to None
Exception ignored in: <generator object _reader_loop at 0x7fd424ec1288>
Traceback (most recent call last):
File "/home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/handler.py", line 436, in _reader_loop
yield from self.handle_connection_closed()
File "/home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py", line 62, in handle_connection_closed
yield from self.handle_disconnect(None)
File "/usr/lib/python3.4/asyncio/coroutines.py", line 141, in coro
res = func(_args, *_kw)
File "/home/adona/dev/hbmqtt/hbmqtt/mqtt/protocol/broker_handler.py", line 58, in handle_disconnect
self._disconnect_waiter.set_result(disconnect)
File "/usr/lib/python3.4/asyncio/futures.py", line 339, in set_result
self._schedule_callbacks()
File "/usr/lib/python3.4/asyncio/futures.py", line 243, in _schedule_callbacks
self._loop.call_soon(callback, self)
File "/usr/lib/python3.4/asyncio/base_events.py", line 427, in call_soon
handle = self._call_soon(callback, args)
File "/usr/lib/python3.4/asyncio/base_events.py", line 436, in _call_soon
self._check_closed()
File "/usr/lib/python3.4/asyncio/base_events.py", line 265, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
How to fix that?
greetings
Attilio
Hi Folks,
I am evaluating hbmqtt for the kind of video streaming scenario where I am publishing mjpeg (sequence of jpegs) with 30-60 FPS rate. For test purposes I have created the following publisher using paho client: https://github.com/veter-team/mechspeak/blob/master/src/publish2.py . It publishes all jpeg files found in the directory specified by the frames_dir variable at the beginning of the file. Subscribers are simple web-applications also written with paho js client. They receive jpegs and displays it on the canvas. Relevant sources are:
https://github.com/veter-team/mechspeak/blob/master/www/videoobserver.html
https://github.com/veter-team/mechspeak/blob/master/www/js/utility.js
I was running the following configurations:
Publishing rate can be adjusted by editing the sleep interval here:
https://github.com/veter-team/mechspeak/blob/master/src/publish2.py#L38
Running the test on RPi with approx. 30 FPS or in the VM with approx. 60FPS will result in hbmqtt server crash after about 10 seconds of operation. Please see attached file for my configuration file and logs generated by hbmqtt.
[MQTT-4.7.3-1]
All Topic Names and Topic Filters MUST be at least one character long.
[2016-04-05 08:33:03,302] :: WARNING - MQTTClient296651 Received reserved packet, which is forbidden: closing connection```
This issue is related to issue #41.
There is another problem, which existed before and which persists with the fix of issue #41. The single-level wildcards denoted by + should not match more than one level, but the replacement-regex allows matching strings as well as level-separators /
, so I guess replace('+', '[/\$\s\w\d]+')
should be changed to something like replace('+', '[^/\$\s\w\d]+')
, i.e. it should only match characters unequal to a /
.
I should note that I did not evaluate this fix thoroughly either. I just checked with some string and did not test special cases like zero length topic names.
However, by doing so, issue #41 would still persist I think if wildcards are used. Since the regex would partially match, by subscribing to a/+/b
, one would also receive messages published for a/something/b/anotherthing
.
Currently, all sessions data managed by a broker are stored in memory. This is a limit for serious use.
HBMQTT broker should be able to persist sessions data (while offline).
Implementation should be based on the plugin system which would allow to use different persistence engine.
This error is reported based on running an MQTTv311 compliance test against the hbmqtt:
[MQTT-3.3.1-1]
The DUP flag MUST be set to 1 by the Client or Server when it attempts to re- deliver a PUBLISH Packet.
[2016-04-05 06:44:39,596] :: INFO - Listener 'default': 1 connections acquired
[2016-04-05 06:44:40,605] :: INFO - Listener 'default': 2 connections acquired
[2016-04-05 06:44:40,606] :: INFO - Connection from 127.0.0.1:58478 on listener 'default'
[2016-04-05 06:44:40,607] :: WARNING - Delete session : session MQTTClient601540 doesn't exist
[2016-04-05 06:44:42,612] :: INFO - Listener 'default': 1 connections acquired
[2016-04-05 06:44:43,619] :: INFO - Listener 'default': 2 connections acquired
[2016-04-05 06:44:43,619] :: INFO - Connection from 127.0.0.1:58479 on listener 'default'
[2016-04-05 06:44:43,621] :: WARNING - Delete session : session MQTTClient615657 doesn't exist
[2016-04-05 06:45:04,629] :: WARNING - Can't add PUBREL waiter, a waiter already exists for message Id '2'
Traceback (most recent call last):
File "/usr/local/bin/hbmqtt", line 11, in <module>
sys.exit(main())
File "/usr/local/lib/python3.5/site-packages/scripts/broker_script.py", line 73, in main
loop.run_forever()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 301, in run_forever
self._run_once()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 1198, in _run_once
handle._run()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/events.py", line 125, in _run
self._callback(*self._args)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(value)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 559, in handle_publish
yield from self._handle_message_flow(incoming_message)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 203, in _handle_message_flow
yield from self._handle_qos2_message_flow(app_message)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 339, in _handle_qos2_message_flow
raise HBMQTTException(message)
hbmqtt.errors.HBMQTTException: Can't add PUBREL waiter, a waiter already exists for message Id '2'
[2016-04-05 06:45:04,690] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<coro() running at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/coroutines.py:198>>
[2016-04-05 06:45:04,690] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<coro() running at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/coroutines.py:198>>
[2016-04-05 06:45:04,700] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<Broker._broadcast_loop() running at /usr/local/lib/python3.5/site-packages/hbmqtt/broker.py:624> wait_for=<Future pending cb=[Task._wakeup()]>>
[2016-04-05 06:45:04,701] :: ERROR - Task exception was never retrieved
future: <Task finished coro=<ProtocolHandler.handle_publish() done, defined at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py:552> exception=HBMQTTException("Can't add PUBREL waiter, a waiter already exists for message Id '2'",)>
Traceback (most recent call last):
File "/usr/local/bin/hbmqtt", line 11, in <module>
sys.exit(main())
File "/usr/local/lib/python3.5/site-packages/scripts/broker_script.py", line 73, in main
loop.run_forever()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 301, in run_forever
self._run_once()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 1198, in _run_once
handle._run()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/events.py", line 125, in _run
self._callback(*self._args)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(value)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 559, in handle_publish
yield from self._handle_message_flow(incoming_message)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 203, in _handle_message_flow
yield from self._handle_qos2_message_flow(app_message)
File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 339, in _handle_qos2_message_flow
raise HBMQTTException(message)
hbmqtt.errors.HBMQTTException: Can't add PUBREL waiter, a waiter already exists for message Id '2'
[2016-04-05 06:45:04,704] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<ProtocolHandler._reader_loop() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py:369> wait_for=<Future pending cb=[Task._wakeup()]>>
[2016-04-05 06:45:04,704] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<MQTTFixedHeader.from_stream() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/packet.py:69> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:336]>
[2016-04-05 06:45:04,704] :: WARNING - BrokerProtocolHandler Unhandled exception in reader coro: GeneratorExit()
Exception ignored in: <generator object ProtocolHandler._reader_loop at 0x1106600a0>
Traceback (most recent call last):
File "/usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py", line 431, in _reader_loop
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 214, in cancel
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 227, in cancel
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 453, in call_soon
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 462, in _call_soon
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 289, in _check_closed
RuntimeError: Event loop is closed
[2016-04-05 06:45:04,705] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<ProtocolHandler.handle_publish() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py:559> wait_for=<Future cancelled>>
[2016-04-05 06:45:04,705] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<BrokerProtocolHandler.wait_disconnect() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/broker_handler.py:44> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:400]>
[2016-04-05 06:45:04,706] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<BrokerProtocolHandler.get_next_pending_unsubscription() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/broker_handler.py:94> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:400]>
[2016-04-05 06:45:04,706] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<BrokerProtocolHandler.get_next_pending_subscription() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/broker_handler.py:89> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:400]>
[2016-04-05 06:45:04,706] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<ProtocolHandler.mqtt_deliver_next_message() running at /usr/local/lib/python3.5/site-packages/hbmqtt/mqtt/protocol/handler.py:457> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:400]>
[2016-04-05 06:45:04,707] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<stream_connected() running at /usr/local/lib/python3.5/site-packages/hbmqtt/broker.py:321> wait_for=<Future pending cb=[Task._wakeup()]>>
[MQTT-3.1.2-22]
If the User Name Flag is set to 0, the Password Flag MUST be set to 0.
MQTT brokers and clients must close the network connection in case of protocol violation according to
Unless stated otherwise, if either the Server or Client encounters a protocol violation, it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation [MQTT-4.8.0-1]
However, I have noticed that hbmqtt handles malformed packets or illegally sent packets in some cases by keeping the network connection open but ignoring all other subsequently received packets. In other cases, illegally sent packets are ignored but normal operation is continued.
Here are a few example with corresponding specification statements:
The Server MUST treat a SUBSCRIBE packet as malformed and close the Network Connection if any of Reserved bits in the payload are non-zero, or QoS is not 0,1 or 2 [MQTT-3-8.3-4].
A PUBLISH Packet MUST NOT have both QoS bits set to 1. If a Server or Client receives a PUBLISH Packet which has both QoS bits set to 1 it MUST close the Network Connection [MQTT-3.3.1-4].
[MQTT-3.1.0-1] After a Network Connection is established by a Client to a Server, the first Packet
sent from the Client to the Server MUST be a CONNECT Packet.
and [MQTT-4.8.0-1] cited above.
A Client can only send the CONNECT Packet once over a Network Connection. The Server MUST
process a second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client [MQTT-3.1.0-2].
The DUP flag MUST be set to 0 for all QoS 0 messages [MQTT-3.3.1-2]
and [MQTT-4.8.0-1] cited above.
Hence, the network connection should probably be closed. However, I have looked at four other implementations which all simply ignore the DUP-flag and forward the message. This is probably better suited to the needs of users although it somehow violates the specification.
For each transmission of my mqtt client I see this message (with different IDs) in my logs:
hbmqtt.broker: Delete session : session paho/AD96ADC1E3EF4820F0 doesn't exist
For the client I use paho.mqtt.publish.single
. The transmission of data is working fine.
Can I simply ignore the message? Do I need to look into the code of broker? Do I need to change parameters on the client?
Hi,
I would like to run an authentication plugin. I read in the documentation that has the entry_points in setup.py but is there a simpler way?
[MQTT-1.5.3-1]
The character data in a UTF-8 encoded string MUST be well-formed UTF-8 as defined by the Unicode specification [Unicode] and restated in RFC 3629 [RFC3629]. In particular this data MUST NOT include encodings of code points between U+D800 and U+DFFF. If a Server or Client receives a Control Packet containing ill-formed UTF-8 it MUST close the Network Connection.[MQTT-1.5.3-2]
A UTF-8 encoded string MUST NOT include an encoding of the null character U+0000. If a receiver (Server or Client) receives a Control Packet containing U+0000 it MUST close the Network Connection.[MQTT-1.5.3-3]
A UTF-8 encoded sequence 0xEF 0xBB 0xBF is always to be interpreted to mean U+FEFF ("ZERO WIDTH NO-BREAK SPACE") wherever it appears in a string and MUST NOT be skipped over or stripped off by a packet receiver.
It's very hard to follow the documentation because the examples don't show where the imports come from. To run an example, I have to look through the docs and see where each name is imported from. Please add imports to the doc examples.
I am trying to add HBMQTT to Home-Assistant. I used the default config as indicated here:
mqtt:
embedded:
I get the following error:
homeassistant.bootstrap: Invalid config for [mqtt]: expected dict for dictionary value @ data['mqtt']['embedded']. Got None
@balloob suggested adding {}
after embedded
, so I modified the config to:
mqtt:
embedded: {}
Now, I get the following errors:
16-09-07 21:33:57 homeassistant.components.mqtt.server: Error initializing MQTT server
Traceback (most recent call last):
File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 184, in _build_listeners_config
listeners_config = broker_config['listeners']
KeyError: 'listeners'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py", line 53, in start
loop.run_until_complete(start_server)
File "/usr/lib/python3.4/asyncio/base_events.py", line 271, in run_until_complete
self.run_forever()
File "/usr/lib/python3.4/asyncio/base_events.py", line 244, in run_forever
self._run_once()
File "/usr/lib/python3.4/asyncio/base_events.py", line 1081, in _run_once
handle._run()
File "/usr/lib/python3.4/asyncio/events.py", line 120, in _run
self._callback(*self._args)
File "/usr/lib/python3.4/asyncio/tasks.py", line 237, in _step
result = next(coro)
File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py", line 23, in broker_coro
broker = Broker(config, loop)
File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 156, in __init__
self._build_listeners_config(self.config)
File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 191, in _build_listeners_config
raise BrokerException("Listener config not found invalid: %s" % ke)
hbmqtt.broker.BrokerException: Listener config not found invalid: 'listeners'
16-09-07 21:33:57 homeassistant.components.mqtt: Unable to start broker and auto-configure MQTT.
16-09-07 21:33:57 homeassistant.bootstrap: component mqtt failed to initialize
16-09-07 21:33:57 asyncio: Task exception was never retrieved
future: <Task finished coro=<broker_coro() done, defined at /srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py:19> exception=BrokerException("Listener config not found invalid: 'listeners'",)>
Traceback (most recent call last):
File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 184, in _build_listeners_config
listeners_config = broker_config['listeners']
KeyError: 'listeners'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py", line 53, in start
loop.run_until_complete(start_server)
File "/usr/lib/python3.4/asyncio/base_events.py", line 271, in run_until_complete
self.run_forever()
File "/usr/lib/python3.4/asyncio/base_events.py", line 244, in run_forever
self._run_once()
File "/usr/lib/python3.4/asyncio/base_events.py", line 1081, in _run_once
handle._run()
File "/usr/lib/python3.4/asyncio/events.py", line 120, in _run
self._callback(*self._args)
File "/usr/lib/python3.4/asyncio/tasks.py", line 237, in _step
result = next(coro)
File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/mqtt/server.py", line 23, in broker_coro
broker = Broker(config, loop)
File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 156, in __init__
self._build_listeners_config(self.config)
File "/home/hass/.homeassistant/deps/hbmqtt/broker.py", line 191, in _build_listeners_config
raise BrokerException("Listener config not found invalid: %s" % ke)
hbmqtt.broker.BrokerException: Listener config not found invalid: 'listeners'
Hi,
I notice that HBMQTTException and ClientException both are deriving from BaseException. Although the doc says asyncio emits a log when seeing unconsumed exceptions. However, BaseException would also abort the whole event loop while other tasks are running just fine.
Is there a way to solve this problem? Or may I suggest to change them to derive from Exception instead?
[MQTT-4.7.1-3]
The single-level wildcard can be used at any level in the Topic Filter, including first and last levels. Where it is used it MUST occupy an entire level of the filter.
Is it possible to generate a file on a server where each line is written by posting to a certain topic in hbmqtt?
In other words: the brocker would listen to a specific topic and store the published data in a given text file.
How to do this?
file: hbmqtt/samples/client_subscribe.py
ClientException is not imported and raises an error if the exception gets thrown.
Fix:
change line 4 to
from hbmqtt.client import MQTTClient, ConnectException, ClientException
[2016-04-05 07:09:02,990] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connecionts=-1)
[2016-04-05 07:09:36,077] :: INFO - Listener 'default': 1 connections acquired
[2016-04-05 07:09:36,077] :: INFO - Connection from 127.0.0.1:59001 on listener 'default'
[2016-04-05 07:09:36,079] :: WARNING - Delete session : session MQTTClient70625 doesn't exist
[2016-04-05 07:09:54,155] :: WARNING - Received PUBREL for unknown pending message with Id: 26471
[2016-04-05 07:09:54,158] :: WARNING - Received PUBREL for unknown pending message with Id: 26471
[2016-04-05 07:09:54,160] :: WARNING - Received PUBREL for unknown pending message with Id: 26471
[2016-04-05 07:09:54,160] :: WARNING - Received PUBREL for unknown pending message with Id: 26471
[2016-04-05 07:09:54,161] :: WARNING - Received PUBREL for unknown pending message with Id: 25090
[2016-04-05 07:09:54,162] :: WARNING - MQTTClient70625 Received reserved packet, which is forbidden: closing connection
[2016-04-05 07:09:54,165] :: INFO - Listener 'default': 0 connections acquired
Hierarchical topics are not handled correctly. More concretely, if I subscribe to a topic at a high level in the hierarchy I will also receive messages published at a lower level.
I tested the following simple example:
ConnAck
SubAck
"my_message"
with C1 and QoS 1 to "my_topic/other_topic"
and receive PubAck
and additionally a Publish
-message for "my_topic/other_topic"
with message "my_message"
Thus, the subscription to "my_topic"
works like a subscription to "my_topic/#"
. I would interpret this as a violation of the MQTT-standard and I also checked with emqtt and mosquitto, which would not send a Publish
in response to the last step.
A quick fix would be to change Line 623 from
match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+'))
to
match_pattern = re.compile("^"+a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+')+"$")
However, I am neither a Python-expert nor did I test the fix thoroughly.
Using a simple mosquitto broker on the websockets
protocol, the method deliver_message
seems to hang for a long time when sending big messages (we use hbmqtt to publish JSON payloads). The longer the message is, the longer the method .recv()
in the WebsocketReader will hang.
Is it a known issue ?
Changing the protocol to mqtt
works like a charm so it does not appear to be a broker problem.
Hello,
I cannot connect a mqtt client (in the browser, using mqtt.js) from the browser to the hbmqtt broker.
The broker prints:
INFO :: websockets.protocol :: Failing the WebSocket connection: 1006
The browser prints:
WebSocket connection to 'ws://127.0.0.1:8888/' failed: Error during WebSocket handshake: Sent non-empty 'Sec-WebSocket-Protocol' header but no response was received
Is this broker incompatible with a browser mqtt over web sockets client?
Thanks in advance,
Vasco Santos
Ran:
hbmqtt_sub --url mqtt://<broker_address>:<broker_port> -t '$SYS/#' -d
Got back a lot of stuff like:
[2016-05-06 05:47:36,762] :: DEBUG - hbmqtt_sub/<client_id> <-in-- PublishPacket(ts=2016-05-06 05:47:36.761783, fixed=MQTTFixedHeader(length=45, flags=0x0), variable=PublishVariableHeader(topic=$SYS/broker/$SYS/broker/load/bytes/sent, packet_id=None), payload=PublishPayload(data="bytearray(b'1993')"))
When I ran it all through a Regular Expression filter, I got:
i.e. the $SYS/broker is being repeated in the Topic, to become $SYS/broker/$SYS/broker.
I'm assuming this isn't correct.
I've upgraded to 0.7, but the client is reporting 0.6.3 when I connect.
Thanks!
...Andrew
Rewrite existing documentation on wiki to sphynx-doc
Python 3.5.1+ (default, Mar 30 2016, 22:46:26)
Ubuntu 16.04 LTS
hbmqtt Installed either with pip3 or from latest github sources.
Attempt to run hbmqtt causes the error below. It seams like casting port to int, i.e. int(port), in broker.py at lines 256 and 263 fixes this issue. However, I did not check if there are any other places where similar problem may occur.
[2016-06-10 14:35:13,583] :: ERROR - Broker startup failed: an integer is required (got type str)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/hbmqtt/broker.py", line 258, in start
loop=self._loop)
File "/usr/lib/python3.5/asyncio/streams.py", line 116, in start_server
return (yield from loop.create_server(factory, host, port, **kwds))
File "/usr/lib/python3.5/asyncio/base_events.py", line 949, in create_server
sock.bind(sa)
TypeError: an integer is required (got type str)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/hbmqtt", line 11, in
sys.exit(main())
File "/usr/local/lib/python3.5/dist-packages/scripts/broker_script.py", line 72, in main
loop.run_until_complete(broker.start())
File "/usr/lib/python3.5/asyncio/base_events.py", line 361, in run_until_complete
self.run_forever()
File "/usr/lib/python3.5/asyncio/base_events.py", line 331, in run_forever
self._run_once()
File "/usr/lib/python3.5/asyncio/base_events.py", line 1298, in _run_once
handle._run()
File "/usr/lib/python3.5/asyncio/events.py", line 125, in _run
self._callback(*self._args)
File "/usr/lib/python3.5/asyncio/tasks.py", line 308, in _wakeup
self._step()
File "/usr/lib/python3.5/asyncio/tasks.py", line 240, in _step
result = coro.send(None)
File "/usr/local/lib/python3.5/dist-packages/hbmqtt/broker.py", line 280, in start
raise BrokerException("Broker instance can't be started: %s" % e)
hbmqtt.broker.BrokerException: Broker instance can't be started: an integer is required (got type str)
Should the use of the timeout parameter generate the following on a timeout?
Traceback (most recent call last):
...
message = await self.C.deliver_message(timeout=3)
File "py3\lib\site-packages\hbmqtt\client.py", line 337, in deliver_message
if deliver_task.exception():
File "c:\python35\Lib\asyncio\futures.py", line 288, in exception
raise InvalidStateError('Exception is not set.')
asyncio.futures.InvalidStateError: Exception is not set.
[MQTT-4.7.2-1]
The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic Names beginning with a $ character.
When connecting a client to the mqtt broker over a websocket I get the following error:
:: WARNING :: hbmqtt.broker :: [MQTT-3.1.0-1] (client @=192.168.0.101:53088): Can't read first packet an CONNECT: No more data
Reproduce:
Start hbmqtt broker
Connect e.g. by using this webclient: http://www.hivemq.com/demos/websocket-client/
This might be related to the following modification in websockets:
http://websockets.readthedocs.org/en/stable/changelog.html?highlight=legacy_recv
but even when using legacy websockets package I get the same behavior.
Thanks for publishing this here on github, I very much like this python mqtt broker
Provide startup scripts for broker, subscribe client and publish client.
The broker allows an event loop to be passed in but this currently raises an error. From a quick search I found two places that need to be made aware of the to be used event loop:
Using basically the default configuration file with IP addresses added, I get an error when I try to use it:
`[2016-04-26 12:33:33,739] :: ERROR - Broker startup failed: 'bind'
Traceback (most recent call last):
File "/home/acoile/hbmqtt/lib/python3.5/site-packages/hbmqtt/broker.py", line 248, in *start
address, port = listener['bind'].split(':')
KeyError: 'bind'
*
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "bin/hbmqtt", line 11, in
sys.exit(main())
File "/home/acoile/hbmqtt/lib/python3.5/site-packages/scripts/broker_script.py", line 72, in main
loop.run_until_complete(broker.start())
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 325, in run_until_complete
self.run_forever()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 295, in run_forever
self._run_once()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 1254, in _run_once
handle._run()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/events.py", line 125, in _run
self._callback(*self._args)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/home/acoile/hbmqtt/lib/python3.5/site-packages/hbmqtt/broker.py", line 276, in start
raise BrokerException("Broker instance can't be started: %s" % e)
hbmqtt.broker.BrokerException: Broker instance can't be started: 'bind'
[2016-04-26 12:33:33,751] :: ERROR - Task was destroyed but it is pending!
task: <Task pending coro=<coro() running at /Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/coroutines.py:204>>`
It appears to be because there is no 'bind: x.y.z.t:p' entry for listener:default:
Shouldn't it be applying listeners:default: from hbmqtt's default configuration, then listeners:default: from this configuration, and then adding the specific items in each listeners: section, and just processing those sections? In other words, the defaults are items to be integrated into the configuration for each listeners:, not something that should be processed on their own.
Thanks for any help you can provide.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.