streamnative / mop Goto Github PK
View Code? Open in Web Editor NEWMQTT on Pulsar implemented using Pulsar Protocol Handler
License: Apache License 2.0
MQTT on Pulsar implemented using Pulsar Protocol Handler
License: Apache License 2.0
I supposed to run pulsar in docker in standalone mode, so I modified the standalone.conf
file with adding the following properties:
messagingProtocols=mqtt
protocolHandlerDirectory=/root/mop/mqtt-impl/target
mqttListeners=mqtt://127.0.0.1:1883
advertisedAddress=127.0.0.1
However, when running the pulsar, it showed the following message:
02:37:07.832 [main] INFO org.apache.pulsar.broker.protocol.ProtocolHandlerUtils - Searching for protocol handlers in /root/mop/mqtt-impl/target
02:37:07.832 [main] WARN org.apache.pulsar.broker.protocol.ProtocolHandlerUtils - Protocol handler directory not found
02:37:07.834 [main] ERROR org.apache.pulsar.broker.PulsarService - Failed to start Pulsar service: No protocol handler is found for protocol `mqtt`. Available protocols are : {}
I had built the MoP project succesfully and the NAR file had occurred in the /root/mop/mqtt-impl/target
directory correctly. I want to know if my process of installing had made some mistakes. Thank you very much.
Can you add server information and online client IP port information to mop browser?
case 1:
used mqtt cli directly to test producer/consumer:
mqtt test -h localhost -p 1883
MQTT 3: OK
- Maximum topic length: 0 bytes
- QoS 0: Received 0/10 publishes in 10033.91ms
- QoS 1: Received 0/10 publishes in 10156.59ms
- QoS 2:
case 2:
used patho python clients to test producer/consumer, also cannot receive mesg as consumer
case 3:
mqtt producer sends to topic (on 1883) and regular pulsar consumer (on 6650) received the message. However, messages sent out to the topic that mqtt consumer is supposed to receive didn't happen. That means producer seems to work fine but consumer was not able to receive messages.
In both cases, there was no errors in pulsar broker logs. The connection/publish/subscribe seem all succeeded but no message received on consumer end. We tried both qos = 0 and qos = 1, same effect.
What could be the issue here?
Hi,
Thanks for your work.
Does mop support MQTT-SN protocol (MQTT For Sensor Networks) over UDP ?
In addition, it would be nice to specify in the README which MQTT version is supported: 3.1.1 ? 5.0 ?
Regards
Currently, PSK identities are configured when MoP startup. If adding new identities, we have to restart the server, but new clients with new identity is a common scenario.
How to enable mqtt related debugging logs
The MQTT spec calls for terminating existing clients with the same ClientID on CONNECT
:
If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client.
This is currently implemented using the ConnectionDescriptorStore
however it's not shared between brokers so connection from a client to another broker within the cluster will violate this part of the spec.
We either need to decide to implement the spec correctly or document non-compliance so users can be aware.
When mqtt is connected, the KeepAlive time is set, but it does not work. Which method implements the heartbeat mechanism?
when I use EMQ broker bridge with mop, error occurs
{"mqttFixedHeader":{"messageType":"CONNECT","isDup":false,"qosLevel":"AT_MOST_ONCE","isRetain":false,"remainingLength":37},"decoderResult":{"cause":{"detailMessage":"MQTT and -123 are not match","stackTrace":[],"suppressedExceptions":[]}}}
the connect payload is null
When two same clientIds are connected, using the mqtt 1883 port will disconnect the previous connection, but using the proxy mqtt 5682 port will not disconnect the two same clientIds
pulsar version 2.8.1
mop 2.8.1.0
When mqtt 1883 port is connected, there is a + or # sign in the subscription topic. It is normal
When mqtt proxy port 5682 is connected, there is a + or # sign in the subscription topic, and an error is reported, Disconnect
topic example: /a/#
error:
17:48:10.693 [pulsar-io-29-3] INFO io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl - [Subscribe] [[id: 0x7bfe505e, L:/127.0.0.1:1883 - R:/127.0.0.1:60697]] msg: MqttSubscribeMessage[fixedHeader=MqttFixedHeader[messageType=SUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=9], variableHeader=MqttMessageIdAndPropertiesVariableHeader[messageId=51601, properties=io.netty.handler.codec.mqtt.MqttProperties@1dcb8506], payload=MqttSubscribePayload[MqttTopicSubscription[topicFilter=/a/#, option=SubscriptionOption[qos=AT_MOST_ONCE, noLocal=false, retainAsPublished=false, retainHandling=SEND_AT_SUBSCRIBE]]]]
17:48:10.694 [pulsar-io-29-3] ERROR io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl - [null] Failed to process MQTT subscribe.
java.util.concurrent.CompletionException: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_171]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_171]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:659) ~[?:1.8.0_171]
at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:669) ~[?:1.8.0_171]
at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:1997) [?:1.8.0_171]
at io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils.getOrCreateSubscription(PulsarTopicUtils.java:71) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl.lambda$processSubscribe$1(DefaultProtocolMethodProcessorImpl.java:317) ~[?:?]
at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) [?:1.8.0_171]
at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) [?:1.8.0_171]
at io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl.processSubscribe(DefaultProtocolMethodProcessorImpl.java:313) [HeTx-YSOEoMroq5t1w13wQ/:?]
at io.streamnative.pulsar.handlers.mqtt.MQTTInboundHandler.channelRead(MQTTInboundHandler.java:63) [HeTx-YSOEoMroq5t1w13wQ/:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.NullPointerException
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:878) ~[com.google.guava-guava-30.1-jre.jar:?]
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.get(ConcurrentOpenHashMap.java:102) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.getSubscription(PersistentTopic.java:1596) ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.getSubscription(PersistentTopic.java:163) ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
at io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils.lambda$getOrCreateSubscription$3(PulsarTopicUtils.java:76) ~[?:?]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_171]
... 33 more
17:48:18.314 [pulsar-client-io-72-1] ERROR io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyProtocolMethodProcessor - The broker channel((localhost,1883)) is not writable!
Currently, we only handle non-batch Pulsar messages in the mop. For batch messages, it's better to leverage the batch index acknowledgment feature. Otherwise, we need to maintain batch acker in the mop.
Here is the template https://streamnative.slab.com/posts/repository-release-template-prb0lvyt
If you have any questions about the release doc template, please leave your comment on it, do not edit it directly.
I've found the similar issue #20 . Generally the same problem.
I'm using pulsar 2.8.0 (by docker), and mop 2.9.0-SNAPSHOT.
I've sent a message to topic by mqtt protocol, and I can receive the message using pulsar consumer, but failed on mqtt consumer.
I've used nodejs
const mqtt = require('mqtt')
const host = '10.191.5.110'
const port = '1883'
const clientId = `mqtt_${Math.random().toString(16).slice(3)}`
const connectUrl = `mqtt://${host}:${port}`
const client = mqtt.connect(connectUrl, {
clientId,
clean: true,
connectTimeout: 4000,
reconnectPeriod: 1000,
})
const topic = 'nodejs-mqtt'
client.on('connect', () => {
console.log('Connected')
client.subscribe([topic], () => {
console.log(`Subscribe to topic '${topic}'`)
})
client.publish(topic, 'nodejs mqtt test', { qos: 0, retain: false }, (error) => {
if (error) {
console.error(error)
}
})
})
client.on('message', (top, payload) => {
console.log('Received Message:', top, payload.toString())
})
and also java (the same code in READ.MD
@Test
public void test03() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("10.191.5.110", 1883);
mqtt.setClientId("java-test");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = { new Topic("persistent://public/default/nodejs-mqtt", QoS.AT_LEAST_ONCE) };
connection.subscribe(topics);
// publish message
connection.publish("persistent://public/default/nodejs-mqtt", "Hello MOP!".getBytes(), QoS.AT_LEAST_ONCE, false);
// receive message
try {
org.fusesource.mqtt.client.Message received = connection.receive();
System.out.println("Message received, " + new String(received.getPayload()));
} catch(Exception e) {
e.printStackTrace();
}
}
but no messages returned.
I've also tried mop v2.8.0 but nothing changed.
As I'm using pulsar-manager, I can see from the management page that the subscription type in MQTT consumer is 'NONE', as it was 'EXCLUSIVE' when I was using pulsar consumer. I don't know if it matters.
Pls tell me what I've done wrong. Thx.
Currently, if the messages are sent by Pulsar with batch mode, the mqtt broker can dispatch the batch messages to the mqtt client, but the acknowledgment does not handle well.
Use topic: /a/# to subscribe, send topic: /a/b/c to send data, you can subscribe to it, but topic: /a/b/d or /a/e/c can not subscribe to the data. Use mqtt mosquitto
Can be subscribed normally
Hi, does MoP accept ws connections? For example with emq broker I can use javascript mqtt libraries and connect using ws since tcp is not possible on browsers.
一: Caused by: org.postgresql.util.PSQLException: 错误: null value in column "id" of relation "pulsar_postgres_jdbc_sink" violates not-null constraint
详细:失败, 行包含(null, null).
... 14 more
10:44:55.576 [pool-6-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Update count 1 not match total number of records 24
10:45:11.792 [pulsar-timer-6-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [pulsar-postgres-jdbc-sink-topic] [public/default/pulsar-postgres-jdbc-sink] [e4f13] Prefetched messages: 0 --- Consume throughput received: 3.22 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
10:45:35.575 [pool-6-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception
org.postgresql.util.PSQLException: 错误: 当前事务被终止, 事务块结束之前的查询被忽略
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2533) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2268) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:313) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:448) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:369) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:159) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:148) ~[postgresql-42.2.12.jar:42.2.12]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) ~[pulsar-io-jdbc-core-2.8.1.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.postgresql.util.PSQLException: 错误: null value in column "id" of relation "pulsar_postgres_jdbc_sink" violates not-null constraint
详细:失败, 行包含(null, null).
... 14 more
10:45:35.576 [pool-6-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Update count 1 not match total number of records 154
10:45:55.576 [pool-6-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception
org.postgresql.util.PSQLException: 错误: 当前事务被终止, 事务块结束之前的查询被忽略
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2533) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2268) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:313) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:448) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:369) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:159) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:148) ~[postgresql-42.2.12.jar:42.2.12]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) ~[pulsar-io-jdbc-core-2.8.1.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.postgresql.util.PSQLException: 错误: null value in column "id" of relation "pulsar_postgres_jdbc_sink" violates not-null constraint
详细:失败, 行包含(null, null).
... 14 more
10:45:55.576 [pool-6-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Update count 1 not match total number of records 15
10:46:11.794 [pulsar-timer-6-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [pulsar-postgres-jdbc-sink-topic] [public/default/pulsar-postgres-jdbc-sink] [e4f13] Prefetched messages: 0 --- Consume throughput received: 2.82 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
二: {"id":1,"name":"adsasda"},The tested client publications and subscriptions are also normal.
for example:
1 Role: aaa, authorized to tenant: aaa, namespace: all topics under aaa have all permissions.
2 Role: bbb, authorized to tenant: bbb, namespace: all topics under bbb have all permissions.
Test: Use pulsar protocol connection, role: aaa subscribe and send topic: persistent://bbb/bbb/mytopic is not authorized, but I use mqtt protocol connection, role: aaa, you can subscribe and send data to topic: persistent: //bbb/bbb/mytopic
From the document: 'The hostname in listeners should be the same as Pulsar broker's advertisedAddress'. And the default advertisedAddress
of pulsar is InetAddress.getLocalHost().getHostName()
according to Pulsar configuration.
Currently doesn't appear that MoP supports authentication or authorisation.
Ideally should support authentication and authorisation through existing extensible Pulsar interfaces.
I download verified released source v2.7.1.4
and run mvn clean install -DskipTests
, just got the error.
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process (process-resource-bundles) on project pulsar-protocol-handler-mqtt-parent: Failed to resolve dependencies for one or more projects in the reactor. Reason: Unable to get dependency information for org.apache.pulsar:pulsar-broker:jar:2.7.1.4: Failed to retrieve POM for org.apache.pulsar:pulsar-broker:jar:2.7.1.4: Could not transfer artifact org.apache.pulsar:pulsar-broker:pom:2.7.1.4 from/to bintray-streamnative-maven (https://dl.bintray.com/streamnative/maven): Access denied to: https://dl.bintray.com/streamnative/maven/org/apache/pulsar/pulsar-broker/2.7.1.4/pulsar-broker-2.7.1.4.pom , ReasonPhrase:Forbidden.
[ERROR] org.apache.pulsar:pulsar-broker:jar:2.7.1.4
I wonder is this the design??
mqtt starts the token authentication method and uses the tool to create the token:
bin/pulsar tokens create --secret-key file:///opt/my-secret.key
--subject test-user1
The token is as follows:
eyJhbI1NiJ9.eyJzdWIiOiJ0VzZXIyIn0.c_eE4mh4tOw_K8tKG_lI8FATkgqVTpUsBBrDOTlWg48
Use token to connect to mqtt, but it can subscribe to topic and send topic data at will, how to restrict the sending and subscribing authority to only specified topic
Creating a venue in SmartZone results in a new configuration being added to SmartZone’s own mosquitto broker. The venue configuration at Figure 3 will result in the following lines:
address 201.52.221.208:8883
topic 3.0/LOC/lbstest/LS/# in
topic 3.0/LOC/lbstest/+/MGR out
bridge_tls_version tlsv1
bridge_identity lbstest
bridge_psk 7275636B757331323321
The first line is the IP address of the external location server. The topic lines allow the SZ mosquitto broker to receive results for multiple topics, and to send results ending with MGR to multiple location servers. The SZ mosquitto broker is running TLS version 1.0.
bridge_identity is the user name the SZ mosquitto broker will use to connect to the LS broker, and bridge_pskis the pre-shared key to authenticate the user. It is the hex conversion of the password that was entered when creating the venue. You can use the site https://www.rapidtables.com/convert/number/ascii-to-hex.html for the conversion. In the example above we used ruckus123! as the password/pre-shared key.
Hi, I've added the .nar file to ./protocol in a local build of Pulsar and am getting a class not found exception when running ./bin/pulsar standalone. Your help is greatly appreciated!
13:01:55.325 [pulsar-io-29-1] INFO io.streamnative.pulsar.handlers.mqtt.support.ProtocolMethodProcessorImpl - [Subscribe] [[id: 0xe2f7eae5, L:/127.0.0.1:1883 - R:/127.0.0.1:58408]] msg: MqttSubscribeMessage[fixedHeader=MqttFixedHeader[messageType=SUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=41], variableHeader=MqttMessageIdVariableHeader[messageId=1], payload=MqttSubscribePayload[MqttTopicSubscription[topicFilter=persistent://public/default/my-topic, qualityOfService=AT_LEAST_ONCE]]]
13:01:55.325 [pulsar-io-29-1] ERROR io.streamnative.pulsar.handlers.mqtt.support.ProtocolMethodProcessorImpl - [persistent://public/default/my-topic] [7f000001e3e460bf5c32] Failed to create subscription on Pulsar topic.
java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: org/apache/pulsar/common/api/proto/PulsarApi$CommandSubscribe$InitialPosition
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) [?:?]
at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:731) [?:?]
at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2108) [?:?]
at io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils.getOrCreateSubscription(PulsarTopicUtils.java:44) ~[pulsar-protocol-handler-mqtt-2.8.0-SNAPSHOT.nar-unpacked/:?]
at io.streamnative.pulsar.handlers.mqtt.support.ProtocolMethodProcessorImpl.processSubscribe(ProtocolMethodProcessorImpl.java:268) [pulsar-protocol-handler-mqtt-2.8.0-SNAPSHOT.nar-unpacked/:?]
at io.streamnative.pulsar.handlers.mqtt.MQTTInboundHandler.channelRead(MQTTInboundHandler.java:60) [pulsar-protocol-handler-mqtt-2.8.0-SNAPSHOT.nar-unpacked/:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [netty-codec-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-4.1.63.Final.jar:4.1.63.Final]
Pulsar protocol connection, send data to topic:persistent://aaa/aaa/my-topic, mqtt protocol subscribes to this topic, no data can be received
See CI of #31
Currently MoP only supports plaintext connections. TLS support should be able to be enabled on a per-listener basis.
The program example in the sins pulse to PostgreSQL document is based on the cluster environment. The database table cannot be successfully inserted in the stand-alone mode. Can you create a new configuration example in the stand-alone mode? In stand-alone mode, pulsar 2.8.1.0, mop 2.8.1.0, pulsar-io-jdbc-postgres-2.8.0.nar
I want to run Pulsar to PostgresQL Sink and Pulsar to postgresQL Sink on a single machine without any exception. I also want to subscribe to the topic messages sent to Pulsar by the test client using MOP 2.8.1.0 and Pulsar 2.8.1.0. Pulsar-io-jdbc-postgres-2.8.0. nar, but messages cannot be written to the postgresQL database table pulsar-postgres-jdbc-sink. Later, I found that there was a Pulsar-Admin sinks localrun options. In the document, there was an example of collocation in the cluster environment, but collocation in the single-machine environment only explained without program examples. Could someone give me an example of running the PostgresQL Sink configuration file in single-machine mode?
pulsar-broker-rabbitmq1.log error
14:32:33.511 [mqtt-redirect-io-44-1] INFO io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyProtocolMethodProcessor - processUnSubscribe...
14:32:33.526 [mqtt-redirect-io-44-1] INFO io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyExchanger - channel read: MqttMessage[fixedHeader=, variableHeader=, payload=]
14:32:33.526 [mqtt-redirect-io-44-1] ERROR io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyExchanger - Failed to decode mqttMessage.
io.netty.handler.codec.DecoderException: Illegal BIT 2 or 1 in fixed header of UNSUBACK message, must be 0, found 1
at io.netty.handler.codec.mqtt.MqttDecoder.decodeFixedHeader(MqttDecoder.java:214) ~[netty-codec-mqtt-4.1.67.Final.jar:4.1.67.Final]
at io.netty.handler.codec.mqtt.MqttDecoder.decode(MqttDecoder.java:88) ~[netty-codec-mqtt-4.1.67.Final.jar:4.1.67.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:366) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-native-epoll-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-native-epoll-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-native-epoll-4.1.66.Final.jar:4.1.66.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common
macos big sur11.4
apache-pulsar-2.8.0/protocols/
2 versions
pulsar-protocol-handler-mqtt-2.9.0-SNAPSHOT.nar
pulsar-protocol-handler-mqtt-2.8.0.8.nar
vim conf/standalone.conf
messagingProtocols=mqtt
protocolHandlerDirectory=./protocols
brokerServicePort=6651
mqttListeners=mqtt://127.0.0.1:1883
advertisedAddress=127.0.0.1
mqttProxyEnable=true
mqttProxyPort=5682
start pulsar
bin/pulsar standalone
error:
ERROR org.apache.pulsar.PulsarStandaloneStarter - Failed to start pulsar service.
java.lang.NoSuchMethodError: org.apache.pulsar.broker.PulsarService.advertisedAddress(Lorg/apache/pulsar/broker/ServiceConfiguration;)Ljava/lang/String;
at io.streamnative.pulsar.handlers.mqtt.MQTTProtocolHandler.start(MQTTProtocolHandler.java:95) ~[?:?]
at org.apache.pulsar.broker.protocol.ProtocolHandlerWithClassLoader.start(ProtocolHandlerWithClassLoader.java:66) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.pulsar.broker.protocol.ProtocolHandlers.lambda$start$4(ProtocolHandlers.java:142) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_171]
at org.apache.pulsar.broker.protocol.ProtocolHandlers.start(ProtocolHandlers.java:142) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:785) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.pulsar.PulsarStandalone.start(PulsarStandalone.java:296) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.pulsar.PulsarStandaloneStarter.main(PulsarStandaloneStarter.java:121) [org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
16:12:37.229 [Thread-1] INFO org.apache.pulsar.broker.service.GracefulExecutorServicesTerminationHandler - Starting termination handler for 4 executors.
16:12:37.229 [globalEventExecutor-2-2] INFO org.apache.pulsar.broker.service.GracefulExecutorServicesTerminationHandler - Starting termination handler for 7 executors.
16:12:37.230 [Thread-1] INFO org.apache.pulsar.broker.service.GracefulExecutorServicesTerminationHandler - Shutdown completed.
16:12:37.230 [globalEventExecutor-2-2] INFO org.apache.pulsar.broker.service.GracefulExecutorServicesTerminationHandler - Shutdown completed.
16:12:37.386 [Thread-1] ERROR org.apache.bookkeeper.client.MetadataUpdateLoop - UpdateLoop(ledgerId=855,loopId=41e400ad) Error writing metadata to store
org.apache.bookkeeper.client.BKException$BKClientClosedException: BookKeeper client is closed
at org.apache.bookkeeper.meta.CleanupLedgerManager.close(CleanupLedgerManager.java:245) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
at org.apache.bookkeeper.client.BookKeeper.close(BookKeeper.java:1439) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
at org.apache.distributedlog.BookKeeperClient.close(BookKeeperClient.java:273) ~[org.apache.distributedlog-distributedlog-core-4.14.1.jar:4.14.1]
at org.apache.distributedlog.impl.BKNamespaceDriver.doClose(BKNamespaceDriver.java:404) ~[org.apache.distributedlog-distributedlog-core-4.14.1.jar:4.14.1]
at org.apache.distributedlog.impl.BKNamespaceDriver.close(BKNamespaceDriver.java:385) ~[org.apache.distributedlog-distributedlog-core-4.14.1.jar:4.14.1]
at com.google.common.io.Closeables.close(Closeables.java:78) ~[com.google.guava-guava-30.1-jre.jar:?]
at org.apache.distributedlog.util.Utils.close(Utils.java:544) ~[org.apache.distributedlog-distributedlog-core-4.14.1.jar:4.14.1]
at org.apache.distributedlog.BKDistributedLogNamespace.close(BKDistributedLogNamespace.java:340) ~[org.apache.distributedlog-distributedlog-core-4.14.1.jar:4.14.1]
at org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService.doClose(DLNamespaceProviderService.java:135) ~[org.apache.bookkeeper-stream-storage-server-4.14.1.jar:4.14.1]
at org.apache.bookkeeper.common.component.AbstractLifecycleComponent.close(AbstractLifecycleComponent.java:123) ~[org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
at org.apache.bookkeeper.common.component.LifecycleComponentStack.lambda$close$6(LifecycleComponentStack.java:154) ~[org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
at com.google.common.collect.ImmutableList.forEach(ImmutableList.java:406) [com.google.guava-guava-30.1-jre.jar:?]
at org.apache.bookkeeper.common.component.LifecycleComponentStack.close(LifecycleComponentStack.java:154) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
at org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent.doClose(StreamStorageLifecycleComponent.java:60) [org.apache.bookkeeper-stream-storage-server-4.14.1.jar:4.14.1]
at org.apache.bookkeeper.common.component.AbstractLifecycleComponent.close(AbstractLifecycleComponent.java:123) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
at org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble.stop(LocalBookkeeperEnsemble.java:492) [org.apache.pulsar-pulsar-zookeeper-utils-2.8.0.jar:2.8.0]
at org.apache.pulsar.PulsarStandaloneStarter$1.run(PulsarStandaloneStarter.java:102) [org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
Using mqtt-client can not connect to the 5682 proxy port, the connection has not been returned,appears after opening authentication, the code is as follows:
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 5682);
mqtt.setUserName("aaaa");
mqtt.setPassword("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhYWEifQ.L9h2m6Mpr1NM7JD8i2YuzKfusuht6fI6BYM39pgZZKs");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = { new Topic("persistent://public/default/my-topic", QoS.AT_LEAST_ONCE) };
connection.subscribe(topics);
// publish message
connection.publish("persistent://public/default/my-topic", "Hello MOP!".getBytes(), QoS.AT_LEAST_ONCE, false);
// receive message
Message received = connection.receive();
Use mqtt protocol to connect, set will, after disconnecting, will not receive will
Add
brokerentrymetadatainterceptors = org. Apache. Pulsar. Common. Intercept. Appendindexmetadatainterceptor
to the configuration file
Cannot subscribe to published messages through MQTT
1:此前工作中环境为消息代理服务器 MQTT BROKER 是mosquitto mqtt和客户端MQTT.FX。2:希望pulsar MOP 中是否需要持久化可以选择。3:外部客户端搜索相关主题可以搜索到。搜索结果显示可以设定默认数目,未有关键字符的搜索可以默认搜索最近100条主题。
https://mosquitto.org/download/
I want to some metrics of mqtt, like online_client_count
, disconnected_client_count
in 5 min, and seems like we can't get this metrics from pulsar api of {PULSAR_IP}:{PORT}/metrics
.
Any idea for this? Alone start http for metrics? I even hope that the metrics of pulsar have only one entrance -> {PULSAR_IP}:{PORT}/metrics
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.