Comments (9)
@DurandA I like this approach, we will start working on this 👍
from gmqtt.
Hello. I've extended the mqtt client.
Features:
- Can use (multiple!) function callbacks on subscriptions
- Can decode/encode yaml messages
- Can define a "namespace" that will be prepended to every topic (i use it to make a unique server act as multiple ones)
from gmqtt import Client
from gmqtt.mqtt.constants import MQTTv50
import os, yaml, re, asyncio, logging
log = logging.getLogger('mqtt_client')
class MQTTClient(Client):
def _handle_msg(self, client, topic, payload, qos, properties):
if not 'subscription_identifier' in properties: return
sub_id = properties['subscription_identifier'][0]
if topic.startswith(self.namespace): topic = topic[len(self.namespace):]
if properties['content_type'][0] == 'text/yaml':
payload = yaml.safe_load(payload)
else:
payload = payload.decode()
for fun in self.topics[sub_id][1]:
log.debug(f'Executing {fun.__name__} because of {self.topics[sub_id][0]} ({sub_id}) match')
if asyncio.iscoroutinefunction(fun):
result = asyncio.create_task(fun(client, topic, payload))
else:
result = fun(client, topic, payload)
def _handle_conn(self, *a):
while 1:
try:
topic = self.pending_subscriptions.pop()
except IndexError:
break
self.subscribe(*topic)
def add_func_handler(self, topic, func):
sub_id = max(self.topics.keys())+1 if self.topics else 1 # sub_id range is 1 to 2^28-1
if topic in self.topics:
self.topics[sub_id][1].append(func)
else:
self.topics[sub_id] = (topic, [func,])
self.subscribe(topic, sub_id)
def subscribe_func(self, topic):
return lambda x: self.add_func_handler(topic, x)
def __init__(self, client_id, namespace=None):
self.topics = {}
self.pending_subscriptions = []
self.namespace = namespace+'/' if namespace and namespace[-1] != '/' else namespace
Client.__init__(self, client_id)
self.on_message = self._handle_msg
self.on_connect = self._handle_conn
def publish(self, *args, **kwargs):
user_property = kwargs['user_property'] if 'user_property' in kwargs else tuple()
payload = args[1]
if isinstance(payload, str):
content_type = 'text'
else:
payload = yaml.safe_dump(payload)
content_type = 'text/yaml'
return Client.publish(self, os.path.join(self.namespace, args[0]), payload, *(args[2:]), **kwargs, content_type=content_type)
# Subscribe or queue the subscription
def subscribe(self, topic, sub_id):
if not self.is_connected:
return self.pending_subscriptions.append((topic, sub_id))
Client.subscribe(self, os.path.join(self.namespace, topic), no_local=True, subscription_identifier=sub_id)
License of this snippet is whatever is the most permissive license compatible with this library.
Also, you can also use this decorator:
@client.subscribe_func('sensors/temp/+')
def need(client, topic, payload):
print('Temperature:', topic, payload)
from gmqtt.
Hello, is somebody working on this? Thinking about implementing it myself.
from gmqtt.
Aww, i actually had an implementation compatible with 3.11, but I've dropped it since i prefer doing it in a cleaner way with subscription_identifier.
Maybe I'll reimplement it later.
from gmqtt.
@edocod1 go for it! perhaps @Lenka42 would be willing to assist. I can assist if required as well.
from gmqtt.
Hi @edocod1,
Thanks for your snippet, but we can't do in that way - because properties
is a feature of MQTT5, but we are supporting MQTT3.11 too;
Fill free to create a PR for that feature, we will review it and help you with implementation
from gmqtt.
You can create a new example (in examples folder) with your code of extended MQTTClient instance and make a note that it will work only in 5 version of protocol;
from gmqtt.
@edocod1 would you be open to collaboration on your fork? If yes, please give me access to your fork and we can work away at the issues until the PR is accepted.
from gmqtt.
Hi @skewty! That snippet there is all i have. I just expanded the Client class to add my own convenience method! The whole repo is actually a p2p web-crawler using mqtt for metrics and communication between peers
from gmqtt.
Related Issues (20)
- How to increase the max in IdGenerator from client? HOT 1
- Resubscribe after autoreconnect HOT 1
- [PROPERTIES] received invalid property id 105, disconnecting HOT 2
- Handler does not work for null body retained messages published before the handler created HOT 6
- Client still running after disconnect() in 0.6.9 HOT 1
- Exceeded reconnect_retries seems seems not to be working HOT 1
- Unsubscribe doesn't remove the topic from the internal list
- I can't to use ca.crt, client.crt, client.key (X509 certificate) to connect mqtt broker. HOT 2
- Read message from mqtt and post Data to wialon HOT 1
- request and response mechanism
- won't remove resend messages if PUBREC is faster than message pushed HOT 1
- publish() and then disconnect() may result in messages not received by a broker
- is there a handler like python.paho.mqtt message_callback_add? HOT 2
- No PubBack Message HOT 1
- Connection error
- [RECV EMPTY] Connection will be reset automatically. HOT 1
- blocking the loop when processing messages for a long time HOT 1
- Formal documentation for API HOT 1
- The subscribe non blocking method
- The QoS 2 Message Receive Queue is not being cleared HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from gmqtt.