Coder Social home page Coder Social logo

amqtt's Introduction

MIT licensed ci coverage Documentation Status 'Join the chat at https://gitter.im/amqtt/community' Python Version supports python wheel PyPI

aMQTT

aMQTT is an open source MQTT client and broker implementation.

Built on top of asyncio, Python's standard asynchronous I/O framework, aMQTT provides a straightforward API based on coroutines, making it easy to write highly concurrent applications.

It was forked from HBMQTT after it was deprecated by the original author.

Features

aMQTT implements the full set of MQTT 3.1.1 protocol specifications and provides the following features:

  • Support QoS 0, QoS 1 and QoS 2 messages flow
  • Client auto-reconnection on network lost
  • Authentication through password file (more methods can be added through a plugin system)
  • Basic $SYS topics
  • TCP and websocket support
  • SSL support over TCP and websocket
  • Plugin system

Project Status and Roadmap

The current focus is to build setup the project infrastructure for the new fork. From there the goal is to fix outstanding known issues and clean up the code.

Version hbmqtt compatibility Supported Python Versions PyPi Release
0.10.x YES - Drop-in Replacement 3.7* 0.10.1
0.11.x NO - Module renamed and small API differences 3.7 - 3.10 No release yet
  • Due to a change in Python 3.8 where the semantics of asyncio.CancelledError was changed

    to be a subclass of BaseException instead of Exception, old versions of hbmqtt and aMQTT will break, see #133. Therefore only 3.7 is mentioned as supported version for 0.10.x.

Getting started

amqtt is available on Pypi and can installed simply using pip : :

$ pip install amqtt

Documentation is available on Read the Docs.

Bug reports, patches and suggestions welcome! Just open an issue or join the gitter channel.

amqtt's People

Contributors

akatrevorjay avatar andvikt avatar burnpanck avatar cdce8p avatar cfchou avatar clach04 avatar d21d3q avatar dansheme avatar edenhaus avatar erics465 avatar fabianelsmer avatar florianludwig avatar herrmuellerluedenscheid avatar hexdump42 avatar jcea avatar jodal avatar lrasku avatar luchermans avatar mbillow avatar mi3z avatar mitchbte avatar njouanin avatar pazzarpj avatar plbertrand avatar romancardenas avatar shipmints avatar sjlongland avatar thiswiederkehr avatar zhukovalexander avatar zyp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqtt's Issues

Replay and retained message logic need work (biggest issue: inactive client sessions grow RAM forever)

Looks like this wasn't a fork but a copy of the repo. The outstanding issues won't have carried over so carrying over this reference from njouanin/hbmqtt#237

Session Message Replay Issues

This is a spot that is broken and currently (and correctly) disabled for qos 0 messages. See https://www.hivemq.com/blog/mqtt-essentials-part-7-persistent-session-queuing-messages/ for persistent session discussion where only QOS 1/2 should be retained for session-reconnect replay.

Here when a client has an active connection to the broker, it will be sent the pending message:

https://github.com/beerfactory/hbmqtt/blob/07c4c70f061003f208ad7e510b6e8fce4d7b3a6c/hbmqtt/broker.py#L709

When an old client session is recorded as having disconnected, every single message is enqueued waiting for the client to reconnect. If the client never reconnects, memory is consumed forever.

https://github.com/beerfactory/hbmqtt/blob/07c4c70f061003f208ad7e510b6e8fce4d7b3a6c/hbmqtt/broker.py#L719-L726

  • The logic assumes that when the client reconnects (assuming it's the same client by client id), it will be a dirty connection and will want the same subscriptions as it did the first time. I feel this is erroneous as one can only know the client's intentions after it has established its subscriptions for a clean session or stated clearly that it wants a non-clean, reused session.
  • The logic should not bother to retain session replay messages for anonymous client sessions. This is a related bug: If a client does not specify a client id, a random UUID is generated and that is stored forever in the broker session cache. This id (and session) will never be reused by the same client which will always send empty client id forcing a new random id to be created. The unreachable session will collect messages for a client connection that can never be reestablished, further exhausting memory.
  • Only messages with QOS 1 or QOS 2 should be put into this queue (hence the QOS 0 suppression in the current patched logic).
  • Configurable caps could be available to control the broker's behavior for the retained message queue either by: total bytes, number of messages, number of topics, list of specific topics.
  • The session collection of these messages probably should not be named retained_messages since its confusing. These are for high QOS guaranteed-delivery attempts.

Retained Topic Message Issues

Persistent topic subscription restoration looks like it needs work to ensure retained messages are provided when a non-clean session is restored the list of session topics is not reprocessed through the retained topic replay logic as it does when a new subscription is made.

Client session retrieved from cache but does not correctly reestablish topic subscriptions and will miss retained messages for those topics upon reconnection:

https://github.com/beerfactory/hbmqtt/blob/07c4c70f061003f208ad7e510b6e8fce4d7b3a6c/hbmqtt/broker.py#L381-L387

Here is where the logic for replaying the retained topic cache for new subscriptions is that should be reused for sessions pulled from the session cache:

https://github.com/beerfactory/hbmqtt/blob/07c4c70f061003f208ad7e510b6e8fce4d7b3a6c/hbmqtt/broker.py#L473-L480

Configurable caps could be available to control the broker's behavior for the topic retained messages similar to those proposed for the session message cache, above.

Add stack traces to logging

Also, small tip (which applies to amqtt in general too, I see the pattern there in lots of places):

            try:
                # … etc …
                pass
            except ConnectException as ce:
                logger.error("Connection failed: %s" % ce)

Not the best way to log an exception for two reasons:

  1. % always formats the string, whether logger is configured to emit logs at error level or not
  2. you miss out on the stack trace, which can be real handy for troubleshooting

logger.error("Connection failed: %s", ce) addresses (1) whilst ignoring (2). The logging module has the exc_info keyword argument, which pretty much will do this for you… I'd write the above as:

            try:
                # … etc …
                pass
            except ConnectException:
                logger.error("Connection failed", exc_info=True)

Or, for error, there's a shortcut:

            try:
                # … etc …
                pass
            except ConnectException:
                logger.exception("Connection failed")

Originally posted by @sjlongland in #75 (comment)

Incompatibility with Python 3.10: Event() no longer supports loop parameter

After solving the initialization issue as noted in #95, I bumped into the following incompatibility with 3.10:

Traceback (most recent call last):
  File "//shovel.py", line 23, in main
    from_client = await connect(from_url)
  File "//shovel.py", line 9, in connect
    client = MQTTClient()
  File "/usr/local/lib/python3.10/site-packages/amqtt/client.py", line 119, in __init__
    self._connected_state = asyncio.Event(loop=self._loop)
  File "/usr/local/lib/python3.10/asyncio/locks.py", line 167, in __init__
    super().__init__(loop=loop)
  File "/usr/local/lib/python3.10/asyncio/mixins.py", line 17, in __init__
    raise TypeError(
TypeError: As of 3.10, the *loop* parameter was removed from Event() since it is no longer necessary

I'll drop to 3.9 rather than figuring out how and where to fix this right now.

MQTTClient fails to raise appropriate exception if URI is broken

A broken URI leads to the following exception when trying to connect to an MQTT broker:

Unhandled exception: 'NoneType' object has no attribute 'write'
paxexpress-api | Connection failed: AttributeError("'NoneType' object has no attribute 'write'")
paxexpress-api | Unhandled exception: 'NoneType' object has no attribute 'write'
paxexpress-api | Reconnection attempt failed: AttributeError("'NoneType' object has no attribute 'write'")
paxexpress-api | Unhandled exception: 'NoneType' object has no attribute 'write'
paxexpress-api | Reconnection attempt failed: AttributeError("'NoneType' object has no attribute 'write'")
paxexpress-api | Unhandled exception: 'NoneType' object has no attribute 'write'
paxexpress-api | Reconnection attempt failed: AttributeError("'NoneType' object has no attribute 'write'")
paxexpress-api | Maximum number of connection attempts reached. Reconnection aborted
paxexpress-api | ERROR:    Traceback (most recent call last):
paxexpress-api |   File "/opt/pysetup/.venv/lib/python3.8/site-packages/amqtt/client.py", line 165, in connect
paxexpress-api |     return await self._do_connect()
paxexpress-api |   File "/opt/pysetup/.venv/lib/python3.8/site-packages/amqtt/client.py", line 254, in _do_connect
paxexpress-api |     return_code = await self._connect_coro()
paxexpress-api |   File "/opt/pysetup/.venv/lib/python3.8/site-packages/amqtt/client.py", line 468, in _connect_coro
paxexpress-api |     return_code = await self._handler.mqtt_connect()
paxexpress-api |   File "/opt/pysetup/.venv/lib/python3.8/site-packages/amqtt/mqtt/protocol/client_handler.py", line 79, in mqtt_connect
paxexpress-api |     await self._send_packet(connect_packet)
paxexpress-api |   File "/opt/pysetup/.venv/lib/python3.8/site-packages/amqtt/mqtt/protocol/handler.py", line 543, in _send_packet
paxexpress-api |     await packet.to_stream(self.writer)
paxexpress-api |   File "/opt/pysetup/.venv/lib/python3.8/site-packages/amqtt/mqtt/packet.py", line 218, in to_stream
paxexpress-api |     writer.write(self.to_bytes())
paxexpress-api | AttributeError: 'NoneType' object has no attribute 'write'

Broken URI: mqtt://"user":"pass"@eu1.thethings.network"

Expectation: A descriptive exception should already be raised when creating the session and mention that no writer could be instantiated due to failing to properly parse the URI.

Convert documentation to mkdocs-material

Writing markdown comes a lot easier to me (and it seems others as well) then ReStructuredText. So I suggest converting the docs to markdown and use mkdocs to generate nice looking html docs.

Plugin interface ineffective

The plugin event handling interface as defined in the manager.py is not really effective.

    async def fire_event(self, event_name, wait=False, *args, **kwargs):
        """
        Fire an event to plugins.
        PluginManager schedule async calls for each plugin on method called "on_" + event_name
        For example, on_connect will be called on event 'connect'
        Method calls are schedule in the asyn loop. wait parameter must be set to true to wait until all
        mehtods are completed.
        :param event_name:
        :param args:
        :param kwargs:
        :param wait: indicates if fire_event should wait for plugin calls completion (True), or not
        :return:
        """
        tasks = []
        event_method_name = "on_" + event_name
        for plugin in self._plugins:
            event_method = getattr(plugin.object, event_method_name, None)

For any event all plugins will be asked if they obtain a handler for the event. This is not even cached.

I propose the following change here:

The plugins should have an optional way have to register their event listeners to the manager for faster processing.

Discussion:
The relation between the event listeners and the event is static after starting the code.
With a minimal event registry (just a dict) the performance can be improved tremendously.

I volunteer to enhance the manager with an event registry, which can optionally be used by plugins to perform faster event handling.

Cheers,
Volker

Plugin config in yaml file not under - plugins entry

First at all I like to thank you to revive hbmqtt!

My aim is to build a well performing mqtt <-> timescaleDB gateway utilizing hbmqtt (and now amqtt).

I tried the persistence plugin. It is working, but the configuration is not nested correctly under the section -plugins:.
The plugin itself complains when the persistence information is not on top level.

(mqtt-broker-1iWrHb9Y-py3.7) pi@data:~/workspace/mqtt_broker $ hbmqtt -c broker.yaml -d
[2021-03-14 17:35:36,692] :: DEBUG - Using selector: EpollSelector
[2021-03-14 17:35:36,695] :: DEBUG - Loading plugins for namespace hbmqtt.broker.plugins
[2021-03-14 17:35:36,699] :: DEBUG -  Loading plugin persistence = hbmqtt.plugins.persistence:SQLitePlugin
[2021-03-14 17:35:36,790] :: DEBUG -  Initializing plugin persistence = hbmqtt.plugins.persistence:SQLitePlugin
[2021-03-14 17:35:36,790] :: WARNING - 'persistence' section not found in context configuration

Putting it on top level yields the correct behavior:

listeners:
  default:
    type: tcp
    bind: 0.0.0.0:1883
sys_interval: 20
auth:
  allow-anonymous: true
plugins:
  - auth_file
  - auth_anonymous

persistence:
  file : 'db.sqlite'

topic-check:
  enabled: False
(mqtt-broker-1iWrHb9Y-py3.7) pi@data:~/workspace/mqtt_broker $ hbmqtt -c broker.yaml -d
[2021-03-14 17:36:16,903] :: DEBUG - Using selector: EpollSelector
[2021-03-14 17:36:16,906] :: DEBUG - Loading plugins for namespace hbmqtt.broker.plugins
[2021-03-14 17:36:16,910] :: DEBUG -  Loading plugin persistence = hbmqtt.plugins.persistence:SQLitePlugin
[2021-03-14 17:36:16,987] :: DEBUG -  Initializing plugin persistence = hbmqtt.plugins.persistence:SQLitePlugin
[2021-03-14 17:36:16,998] :: INFO - Database file 'db.sqlite' opened

I will have a look at the plugin manager and the persistance plugin code and try to come up with a fix.

Cheers
Volker

Setup CI

The original repo used tavis.

We could also check out gitlab actions.

Personally, I don't care.

Data model for persistence

I am developing a data base persistence layer between amqtt and relational DBs with focus on timescaleDB.

All timeseries databases (except) timescaleDB have really simple data models. Usually a timestamp and an associated JSON like structure (BLOB) containig the session data. Some have tags and group the data by them running in cardinality problems.

I would like to bring the total force of timescaleDB into effect utilizing the "wide-tables" approach which is no more and no less simply one table per "Sender type". Look here .

My work is heavily influenced by the work of others, namely the timscaleDB plugin for telegraf, which is a work in progress documented in this telegraf pull request .

This thread is for general discussion on the topic DB data model.

Cheers,
Volker

test matrix for dependencies

we support version ranges for dependencies. We should run different combinations of the dependencies in our CI, just like with python versions.

0.10.0 trows exception on connect

Version: 1.10.0
Python: 3.8.10

Output broker:

▶ amqtt                                      
[2021-11-26 08:50:05,493] :: INFO - Finished processing state new exit callbacks.
[2021-11-26 08:50:05,493] :: INFO - Finished processing state starting enter callbacks.
[2021-11-26 08:50:05,493] :: INFO - ### 'on_broker_pre_start' EVENT FIRED ###
[2021-11-26 08:50:05,494] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)
[2021-11-26 08:50:05,494] :: INFO - Finished processing state starting exit callbacks.
[2021-11-26 08:50:05,494] :: INFO - Finished processing state started enter callbacks.
[2021-11-26 08:50:05,494] :: INFO - ### 'on_broker_post_start' EVENT FIRED ###
[2021-11-26 08:50:09,831] :: INFO - Listener 'default': 1 connections acquired
[2021-11-26 08:50:09,831] :: INFO - Connection from 127.0.0.1:59932 on listener 'default'
[2021-11-26 08:50:09,832] :: INFO - ### 'on_mqtt_packet_received' EVENT FIRED ###
[2021-11-26 08:50:09,832] :: ERROR - Task exception was never retrieved
future: <Task finished name='Task-9' coro=<Broker.stream_connected() done, defined at /src/HBMQTT_Paho_MySQL_MongoDB/venv/lib/python3.8/site-packages/amqtt/broker.py:394> exception=TypeError("'NoneType' object is not callable")>
Traceback (most recent call last):
  File "/src/HBMQTT_Paho_MySQL_MongoDB/venv/lib/python3.8/site-packages/amqtt/broker.py", line 395, in stream_connected
    await self.client_connected(
  File "/src/HBMQTT_Paho_MySQL_MongoDB/venv/lib/python3.8/site-packages/amqtt/broker.py", line 461, in client_connected
    authenticated = await self.authenticate(
  File "/src/HBMQTT_Paho_MySQL_MongoDB/venv/lib/python3.8/site-packages/amqtt/broker.py", line 688, in authenticate
    returns = await self.plugins_manager.map_plugin_coro(
  File "/src/HBMQTT_Paho_MySQL_MongoDB/venv/lib/python3.8/site-packages/amqtt/plugins/manager.py", line 212, in map_plugin_coro
    return await self.map(self._call_coro, coro_name, *args, **kwargs)
  File "/src/HBMQTT_Paho_MySQL_MongoDB/venv/lib/python3.8/site-packages/amqtt/plugins/manager.py", line 188, in map
    ret_list = await asyncio.gather(*tasks, loop=self._loop)
  File "/src/HBMQTT_Paho_MySQL_MongoDB/venv/lib/python3.8/site-packages/amqtt/plugins/manager.py", line 201, in _call_coro
    coro = getattr(plugin.object, coro_name)(*args, **kwargs)
TypeError: 'NoneType' object is not callable

Output Client:

▶ amqtt_sub --url mqtt://localhost -t /test/#

Environment:

▶ pip list
Package       Version
------------- -------
amqtt         0.10.0 
docopt        0.6.2  
paho-mqtt     1.6.1  
passlib       1.7.4  
pip           20.0.2 
pkg-resources 0.0.0  
PyYAML        5.4.1  
setuptools    44.0.0 
six           1.16.0 
transitions   0.8.10 
websockets    9.1  

Setup unit tests running against different versions of dependencies

Suggestion:

  • Test against oldest versions we support with oldest supported python version
  • Test with versions from poetry.lock against all supported python versions
  • Update poetry.lock from time to time, possibly automated (for example with renovate)

I have not yet figured out how to generated a poetry.lock (or other pinned dependency list) with a resolver for "minumum" versions

unecessary constraint on password

Hi all,

It seems to me that amqtt's broker is needlessly requiring a password when a username is provided. It happens in https://github.com/Yakifo/amqtt/blob/master/amqtt/mqtt/protocol/broker_handler.py#L164-L167

        elif not connect.username_flag and connect.password_flag:
            connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD)  # [MQTT-3.1.2-22]
        elif connect.username_flag and not connect.password_flag:
            connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD)  # [MQTT-3.1.2-22]

As per the spec (http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349236 is hopefully the right ref)

If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].

So while the first elif is perfectly in line with the spec (you can't have a password without a username), I can't find anything in the spec about that second elif. Commenting out those 2 lines allows my client to connect and work just fine.

FTR, my client code is built using azure-iot-device (which uses paho behind the scene) and I use amqtt's broker to create a fake Azure IoTHub for mocking purposes. When using X.509 client certificates, the Azure libs use the "device id" (also found in the certificate's CN) and pass that to paho as a username, but without any password. For now, I've worked around the issue by monkeypatching paho to send both a username and a fake password to please amqtt. But it really looks like a bug in amqtt AFAICT.

I wanted to create a patch for this but poetry is foreign to me.

Cheers

Making use of events

I can't see any documentation on this, so:

I can see that there is an event system which means that I should be able to add code to a broker to respond to things like client connections, etc. But I can't see how to do that; given it's within the plugin system it suggests I would need to write my own plugin?

If it's simple to do, could the broker_start.py sample be modified to include some event callbacks? Or if it's too complex for that simple example can someone point me in the right direction?

It seems that the handling of unkown message ids is broken

I get a lot of these messages running amqtt as a broker:

[2022-02-13 12:02:26,878] :: WARNING :: amqtt.mqtt.protocol.handler :: Received PUBREL for unknown pending message with Id: 14882
[2022-02-13 12:02:26,879] :: WARNING :: amqtt.mqtt.protocol.handler :: Received PUBACK for unknown pending message Id: '11298'
[2022-02-13 12:02:26,879] :: WARNING :: amqtt.mqtt.protocol.handler :: Received PUBCOMP for unknown pending message with Id: 25890

Looking at the code it seems that aqmtt does nothing but print the warning, which seems wrong in my opinion. The messages come back infinitely from the clients because they have not been dealt with.

Abstract factory for plugins

Hi,

I was a contributor of HBMQTT, and I remember that dealing with custom plugins was a pain.

For my work, I started using the abstract factory pattern for plugins in Python, and that works pretty well. It allows users to add their custom plugins dynamically by just calling a method. I do believe this pattern is more user-friendly and can fit in AMQTT.

Before implementing it, I wanted to ask about your opinion. If you are not interested in this change, I'll leave plugins alone :)

`passlib` required even for the client

Seems the amqtt client pulls in passlib even though it doesn't actually use it anywhere (it's used in the server for the file-based authentication back-end). It'd be nice if we could "do away" with this dependency on the client:

  File "/opt/wicen/wicen-rfid-aprs-bridge/lib/python3.7/site-packages/wicenrfidapiclient/session.py", line 974, in _connect_mqtt
    'auto-reconnect': False
  File "/opt/wicen/wicen-rfid-aprs-bridge/lib/python3.7/site-packages/amqtt/client.py", line 127, in __init__
    "amqtt.client.plugins", context, loop=self._loop
  File "/opt/wicen/wicen-rfid-aprs-bridge/lib/python3.7/site-packages/amqtt/plugins/manager.py", line 52, in __init__
    self._load_plugins(namespace)
  File "/opt/wicen/wicen-rfid-aprs-bridge/lib/python3.7/site-packages/amqtt/plugins/manager.py", line 63, in _load_plugins
    plugin = self._load_plugin(ep)
  File "/opt/wicen/wicen-rfid-aprs-bridge/lib/python3.7/site-packages/amqtt/plugins/manager.py", line 73, in _load_plugin
    plugin = ep.load(require=True)
  File "/opt/wicen/wicen-rfid-aprs-bridge/lib/python3.7/site-packages/pkg_resources/__init__.py", line 2410, in load
    self.require(*args, **kwargs)
  File "/opt/wicen/wicen-rfid-aprs-bridge/lib/python3.7/site-packages/pkg_resources/__init__.py", line 2433, in require
    items = working_set.resolve(reqs, env, installer, extras=self.extras)
  File "/opt/wicen/wicen-rfid-aprs-bridge/lib/python3.7/site-packages/pkg_resources/__init__.py", line 786, in resolve
    raise DistributionNotFound(req, requirers)
pkg_resources.DistributionNotFound: The 'passlib<2.0.0,>=1.7.0' distribution was not found and is required by the application

DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.

I was doing some unit testing for some code that uses the amqtt.client module and this warning showed up two times.
This are the exact spots:

/home/user/.local/lib/python3.8/site-packages/amqtt/client.py:119: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10. self._connected_state = asyncio.Event(loop=self._loop)

And

/home/user/.local/lib/python3.8/site-packages/amqtt/client.py:120: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10. self._no_more_connections = asyncio.Event(loop=self._loop)

I'm using the 0.10.0 version.

Broker not sending messages

I'm hoping I'm just doing something wrong, but I can't seem to manage to get the Broker working quite right.

Here is my setup:

if __name__ == '__main__':

    formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)

    conf = {
        'listeners': {
            'default': {
                'type': 'tcp',
                'bind': '0.0.0.0:1883'
            },
            'ws1': {
                'type': 'ws',
                'bind': '0.0.0.0:9001'
            }
        },
        'auth': {
            'allow-anonymous': True
        },
        'topic-check': {
            'enabled': False
        }
    }
    mqttBroker = Broker(conf)
    asyncio.get_event_loop().run_until_complete(mqttBroker.start())
    asyncio.get_event_loop().run_forever()

I've tried MQTT-Explorer (which I use for all my mqtt work) and the mqtt client plugin for VSCode.
I'm able to connect to either the tcp or the ws port no problem.
I see the log messages output for all the connections on the broker.
I see the messages being posted to the broker log, and I see what appears to be an attempt to send out the messages.
But... no messages seem to make it out to any clients I've tried.

Here is the Broker log when I sent a very basic message to a simple topic.
topic: test
message: two

[2022-05-22 20:58:11,052] :: DEBUG :: amqtt.broker.plugins :: Plugins len(_fired_events)=3
[2022-05-22 20:58:11,052] :: INFO :: amqtt.broker.plugins.event_logger_plugin :: ### 'on_mqtt_packet_received' EVENT FIRED ###
[2022-05-22 20:58:11,053] :: DEBUG :: amqtt.broker.plugins.packet_logger_plugin :: mqtt-explorer-4a9e38c0 <-in-- PublishPacket(ts=2022-05-22 20:58:11.052645, fixed=MQTTFixedHeader(length=11, flags=0x2), variable=PublishVariableHeader(topic=test, packet_id=54150), payload=PublishPayload(data="bytearray(b'two')"))
[2022-05-22 20:58:11,053] :: DEBUG :: amqtt.mqtt.protocol.handler :: Add message to delivery
[2022-05-22 20:58:11,053] :: DEBUG :: amqtt.broker.plugins :: Plugins len(_fired_events)=6
[2022-05-22 20:58:11,053] :: DEBUG :: amqtt.mqtt.protocol.handler :: Message queue size: 1
[2022-05-22 20:58:11,053] :: DEBUG :: amqtt.mqtt.protocol.handler :: Delivering message <amqtt.session.IncomingApplicationMessage object at 0x7fc58c36e3c0>
[2022-05-22 20:58:11,053] :: INFO :: amqtt.broker.plugins.event_logger_plugin :: ### 'on_mqtt_packet_sent' EVENT FIRED ###
[2022-05-22 20:58:11,053] :: DEBUG :: amqtt.broker.plugins.packet_logger_plugin :: mqtt-explorer-4a9e38c0 -out-> PubackPacket(ts=2022-05-22 20:58:11.053226, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=54150), payload=None)
[2022-05-22 20:58:11,053] :: DEBUG :: amqtt.broker :: mqtt-explorer-4a9e38c0 handling message delivery
[2022-05-22 20:58:11,054] :: DEBUG :: amqtt.broker.plugins :: Plugins len(_fired_events)=1
[2022-05-22 20:58:11,054] :: INFO :: amqtt.broker.plugins.event_logger_plugin :: ### 'on_broker_message_received' EVENT FIRED ###
[2022-05-22 20:58:11,054] :: DEBUG :: amqtt.broker :: broadcasting {'session': Session(clientId=mqtt-explorer-4a9e38c0, state=connected), 'topic': 'test', 'data': bytearray(b'two')}
[2022-05-22 20:58:11,054] :: DEBUG :: amqtt.mqtt.protocol.handler :: 0 message(s) available for delivery

MQTT-Explorer never received any message though.
MQTT-Explorer also automatically subscribes to the root wildcard, so It should see every single message that is sent on all topics. However, no topics even appear in it's topic list.

Thanks for any help.
I hope I'm just missing something obvious.

edit
Just to be sure I wasn't doing something glaringly wrong.
I tried just running the broker from the shell . Same outcome.
the connection to the broker through mqtt-explorer is good, but no messages seem to find their way out.

> pip3 install amqtt
spowell@stowe $amqtt
[2022-05-25 12:30:50,699] :: INFO - Finished processing state new exit callbacks.
[2022-05-25 12:30:50,699] :: INFO - Finished processing state starting enter callbacks.
[2022-05-25 12:30:50,700] :: INFO - ### 'on_broker_pre_start' EVENT FIRED ###
[2022-05-25 12:30:50,700] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)
[2022-05-25 12:30:50,700] :: INFO - Finished processing state starting exit callbacks.
[2022-05-25 12:30:50,700] :: INFO - Finished processing state started enter callbacks.
[2022-05-25 12:30:50,700] :: INFO - ### 'on_broker_post_start' EVENT FIRED ###
[2022-05-25 12:31:09,497] :: INFO - Listener 'default': 1 connections acquired
[2022-05-25 12:31:09,497] :: INFO - Connection from 127.0.0.1:54592 on listener 'default'
[2022-05-25 12:31:09,500] :: INFO - ### 'on_mqtt_packet_received' EVENT FIRED ###
[2022-05-25 12:31:09,501] :: INFO - Finished processing state new exit callbacks.
[2022-05-25 12:31:09,501] :: INFO - Finished processing state connected enter callbacks.
[2022-05-25 12:31:09,502] :: INFO - ### 'on_mqtt_packet_sent' EVENT FIRED ###
[2022-05-25 12:31:09,502] :: INFO - ### 'on_broker_client_connected' EVENT FIRED ###
[2022-05-25 12:31:09,505] :: INFO - ### 'on_mqtt_packet_received' EVENT FIRED ###
[2022-05-25 12:31:09,506] :: INFO - ### 'on_mqtt_packet_received' EVENT FIRED ###
[2022-05-25 12:31:09,506] :: INFO - ### 'on_mqtt_packet_sent' EVENT FIRED ###
[2022-05-25 12:31:09,507] :: INFO - ### 'on_mqtt_packet_sent' EVENT FIRED ###
[2022-05-25 12:31:16,648] :: INFO - ### 'on_mqtt_packet_received' EVENT FIRED ###
[2022-05-25 12:31:16,649] :: INFO - ### 'on_mqtt_packet_sent' EVENT FIRED ###
[2022-05-25 12:31:16,650] :: INFO - ### 'on_broker_message_received' EVENT FIRED ###
[2022-05-25 12:31:58,747] :: INFO - ### 'on_mqtt_packet_received' EVENT FIRED ###
[2022-05-25 12:31:58,749] :: INFO - Finished processing state connected exit callbacks.
[2022-05-25 12:31:58,749] :: INFO - Finished processing state disconnected enter callbacks.
[2022-05-25 12:31:58,749] :: INFO - Listener 'default': 0 connections acquired
[2022-05-25 12:31:58,749] :: INFO - ### 'on_broker_client_disconnected' EVENT FIRED ###

python 3.6 support

Python 3.6.8 is planned to be the last bugfix release for 3.6.x. Following the release of 3.6.8, we plan to provide security fixes for Python 3.6 as needed through 2021, five years following its initial release.

Python 3.6.8 was released 2018-12-24. Since then no bugfix releases were done, only security fixes are provided.

I would drop 3.6 support in the 0.11.x release of aMQTT. What does everyone else think? Objections?

A failing plugin kills the broker

Dear @ALL!

The exception handling of a failed import kills the broker by backstabbing:

    def _load_plugin(self, ep: pkg_resources.EntryPoint):
        try:
            self.logger.debug(" Loading plugin %s" % ep)
            plugin = ep.load(require=True)
            self.logger.debug(" Initializing plugin %s" % ep)
            plugin_context = copy.copy(self.app_context)
            plugin_context.logger = self.logger.getChild(ep.name)
            obj = plugin(plugin_context)
            return Plugin(ep.name, ep, obj)
        except ImportError as ie:   <- this line
            self.logger.warning("Plugin %r import failed: %s" % (ep, ie))
        except pkg_resources.UnknownExtra as ue:
            self.logger.warning("Plugin %r dependencies resolution failed: %s" % (ep, ue))

Plugins are imported via pkg_resources. If the plugin fails to load (due to an internal error) an ImportError is raised via pkg_resources. This leads to a return value of None to the calling routine, leading to the happy but wrong message:


[2021-03-31 00:32:08,049] :: WARNING - Plugin EntryPoint.parse('amqtt_db = amqtt_db.plugin:DBPlugin') import failed: 'amqtt_db'
[2021-03-31 00:32:08,049] :: DEBUG -  Plugin amqtt_db ready

and then to this stacktrace:

future: <Task finished coro=<Broker.stream_connected() done, defined at /home/volker/workspace/venvs/amqtt-db-Sb9e8jh5-py3.7/lib/python3.7/site-packages/hbmqtt/broker.py:335> exception=AttributeError("'NoneType' object has no attribute 'name'")>
Traceback (most recent call last):
  File "/home/volker/workspace/venvs/amqtt-db-Sb9e8jh5-py3.7/lib/python3.7/site-packages/hbmqtt/broker.py", line 336, in stream_connected
    await self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
  File "/home/volker/workspace/venvs/amqtt-db-Sb9e8jh5-py3.7/lib/python3.7/site-packages/hbmqtt/broker.py", line 386, in client_connected
    authenticated = await self.authenticate(client_session, self.listeners_config[listener_name])
  File "/home/volker/workspace/venvs/amqtt-db-Sb9e8jh5-py3.7/lib/python3.7/site-packages/hbmqtt/broker.py", line 541, in authenticate
    filter_plugins=auth_plugins)
  File "/home/volker/workspace/venvs/amqtt-db-Sb9e8jh5-py3.7/lib/python3.7/site-packages/hbmqtt/plugins/manager.py", line 206, in map_plugin_coro
    return (await self.map(self._call_coro, coro_name, *args, **kwargs))
  File "/home/volker/workspace/venvs/amqtt-db-Sb9e8jh5-py3.7/lib/python3.7/site-packages/hbmqtt/plugins/manager.py", line 168, in map
    p_list = [p.name for p in self.plugins]
  File "/home/volker/workspace/venvs/amqtt-db-Sb9e8jh5-py3.7/lib/python3.7/site-packages/hbmqtt/plugins/manager.py", line 168, in <listcomp>
    p_list = [p.name for p in self.plugins]
AttributeError: 'NoneType' object has no attribute 'name'

Where the None is the return value of the failed loading of the plugin.

From the perspective of a user this cannot be understood. Even as a developer you are challenged.

I propose to push the exception handling simply a level up into

def _load_plugins(self, namespace):
        self.logger.debug("Loading plugins for namespace %s" % namespace)
        for ep in pkg_resources.iter_entry_points(group=namespace):
            plugin = self._load_plugin(ep)
            self._plugins.append(plugin)
            self.logger.debug(" Plugin %s ready" % ep.name)

Here the exceptions can be handled with more context and consistency.

Shall I do a pullrequest or are you fixing this by yourselves?

Cheers,
Volker

Flake8 on long doc strings

How do we handle long doc string line containing e.g. URLS such as

    Send an MQTT `PUBLISH <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037>`_ message and wait for acknowledgment depending on

(taken from client.py)

Append noqa and end of line or break the URL? I'm not sure what is more common. Opinions?

Broker liveness probe failures in k8s

Hi,
I'm trying to use amqtt broker in Kubernetes, and I want to do a health check using liveness probe which leads to NoDataException

Health checks are done as following:

livenessProbe:
  tcpSocket:
    port: 1883
  timeoutSeconds: 90
  periodSeconds: 120
  successThreshold: 1
  failureThreshold: 3

Here are the Broker's logs:

ERROR :: asyncio :: Task exception was never retrieved
future: <Task finished name='Task-59465' coro=<Broker.stream_connected() done, defined at /Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/broker.py:394> exception=NoDataException('No more data')>
Traceback (most recent call last):
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/broker.py", line 395, in stream_connected
    await self.client_connected(
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/broker.py", line 416, in client_connected
    handler, client_session = await BrokerProtocolHandler.init_from_connect(
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/mqtt/protocol/broker_handler.py", line 130, in init_from_connect
    connect = await ConnectPacket.from_stream(reader)
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/mqtt/packet.py", line 249, in from_stream
    variable_header = await cls.VARIABLE_HEADER.from_stream(
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/mqtt/connect.py", line 120, in from_stream
    protocol_name = await decode_string(reader)
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/codecs.py", line 66, in decode_string
    length_bytes = await read_or_raise(reader, 2)
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/codecs.py", line 56, in read_or_raise
    raise NoDataException("No more data")
amqtt.errors.NoDataException: No more data

I also noticed that the broker keeps the connection even after Kubernetes closes the socket

To reproduce:

python example

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server = ('localhost', 1883)
sock.connect(server)
sock.close()

amqtt server not relaying traffic

Platform: Debian GNU/Linux 10 (Buster) on AMD64, also reproduced on Gentoo/AMD64
amqtt installed either from pypi or via git using pip.

A simple test without a configuration file… if I open up three terminal windows:

  1. I run amqtt (no arguments)… the server starts successfully, binds to port 1883, listens for traffic. So far so good.
  2. Second terminal I run amqtt_sub --url mqtt://localhost -t \# (note the backslash: needed on most shells to escape the #), it starts, connects and waits for traffic. amqtt on terminal (1) reports a client has connected.
  3. Third terminal, I run amqtt_pub --url mqtt://localhost -t /testing -m 'test message'… it exits without error.

After running that test, no traffic is observed on terminal (2). If I kill amqtt and amqtt_sub, start up another MQTT server such as Mosquitto in place of amqtt, then repeat steps 2 & 3, I see the traffic relayed.

I have also tried setting up a configuration file with a password list and using authenticated users, it still fails to pass through traffic.

The idea of an embeddable and extensible MQTT server is fantastic, so I'm really hoping I can get this working but I'm starting simple first before I try hacking things.

asyncio.get_event_loop() deprecated in Python 3.10

So the examples should probably be changed to use a supported method going forward. I'm not an asyncio expert, and all the variations I've tried using asyncio.get_running_loop() or asyncio.new_event_loop() have failed.

If anyone knows how this is supposed to work going forward, let me know and I'll test and update the docs.

don't retain messages from anonymous clients

from @shipmints

The logic should not bother to retain session replay messages for anonymous client sessions. If a client does not specify a client id, a random UUID is generated and that is stored forever in the broker session cache. This id (and session) will never be reused by the same client which will always send empty client id forcing a new random id to be created. The unreachable session will collect messages for a client connection that can never be reestablished, further exhausting memory.

Subscriber not receiving messages from local hbmqtt broker

I am running subscriber to hbmqtdd, which i ran on my local machine, either on 127.0.0.1, or another local ip address, expected to receive $SYS packets, but simply not receiving them. The same subscriber receives the SYS messages properly from test.mosquitto.org, so the subscriber logic is working ok in that case. If i publish a message to a topic on the local broker, then the subscriber does not receive the message.

Commands used

C:\Users\madhu\AppData\Local\Programs\Python\Python37\Scripts\hbmqtt_sub --url mqtt://127.0.0.1:1883/ -t '$SYS/#' -q 0

[2021-03-18 00:34:25,582] :: INFO - Finished processing state new exit callbacks.
[2021-03-18 00:34:25,582] :: INFO - Finished processing state connected enter callbacks.

… but nothing else…………………...

C:\Users\madhu\AppData\Local\Programs\Python\Python37\Scripts\hbmqtt.exe -d

Connection with mysql

hello together,
i have this amqtt during my research of mqtt-Broker writing in python. Really nice project.

I wanted to ask if the persistence with a mysql-DB is also possible, why i just see a example with sqlite.

i will be great for any reply and any help.

Does $SYS topic broadcast work?

Here's my config dict:

broker_config = {
    "listeners": {
        "default": {
            "type": "tcp",
            "bind": "0.0.0.0:1883",
        },
    },
    "sys_interval": 10,
    "auth": {
        "allow-anonymous": True,
        "plugins": ["auth_anonymous"],
    },
    "topic-check": {
        "enabled": False,
        "plugins": [],
    },
}

When I subscribe to $SYS/# I don't get any messages.

embedding amqtt in existing http server

Food for thought related to this: in my application I'm using aiohttp as the HTTP server for the main application providing a REST API. I then expose real-time updates via MQTT over websockets.

aiohttp supports creation and hosting of websockets. Right now for the user to access MQTT I have three choices:

  • bind amqtt server to a separate port, expose that via the firewall and have my application direct the user to the new port (fine for development, not sure about this for production use though)
  • bind amqtt server to a separate private port, configure my application's front-end reverse proxy (nginx) to also forward that second port to some URI namespace.
  • bind amqtt server to a separate private port, configure some sort of reverse proxy within aiohttp to forward the MQTT/websocket traffic to amqtt.

I'm wondering if that ListenerType enumeration shouldn't have a plugin option. This would require the user write a plug-in that implements the same interface as the tcp and ws listener types. Thus, in my application, I could implement a plug-in that creates a websocket using aiohttp calls and binds it to a path in my server's namespace. I'm thinking in this situation, things like the bind option become optional since bind makes no sense when you're not actually binding to a network socket directly.

Anyway, probably this is in the scope of a second related pull-request.

Originally posted by @sjlongland in #72 (comment)

Search deprecated repo for gems to merge

we should check all merge requests on the hbmqtt repo and decide if we want to include them in this fork - and make a new merge request here if desired.

provide list of pinned dependencies for development

During development we should have a set of pinned dependencies for a reproducible development and CI setup.

Options:

  • requirements.txt
  • pipenv
  • poetry

I am in a love/hate relationship with poetry and would go with that :)

rename module and create import hook

rename the module and create an import hook to allow importing the old name.

  • Decide on module name
  • rename repo
  • rename package to match module name (blocked by #7)
  • Implement import hook & deprecation warning when importing the old name

New release

It would be nice to have a new release that includes #64. Would allow to update distribution packages which out patching.

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.