MQTT 5.0 and 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support.
A detailed documentation can be found here
- All MQTT 3.1.1 and MQTT 5.0 features
- API flavors:
- Reactive, Async and Blocking
- Flexible switching
- Consistent and clearly separated
- Backpressure support:
- QoS 1 and 2
- QoS 0 (dropping incoming messages if necessary)
- SSL/TLS
- WebSocket
- Automatic and configurable thread management
- MQTT 5 specific:
- Pluggable Enhanced Auth support
- Additional to MQTT specification: server-triggered reauth
- Automatic Topic Alias mapping
- Interceptors for QoS flows
- Pluggable Enhanced Auth support
- Automatic reconnect handling and message redelivery
Java 8 or higher is required.
If you use Gradle, just include the following inside your build.gradle
file.
dependencies {
compile group: 'com.hivemq', name: 'hivemq-mqtt-client', version: '1.0.1'
}
If you use Maven, just include the following inside your pom.xml
file.
NOTE: You have to set the compiler version to 1.8
or higher.
<project>
...
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.0.1</version>
</dependency>
</dependencies>
...
</project>
If you are experiencing problems with transitive dependencies, you can try the shaded version.
This version packs the transitive dependencies which are only used internal under a different package name.
To use the shaded version just append -shaded
to the artifact name.
dependencies {
compile group: 'com.hivemq', name: 'hivemq-mqtt-client-shaded', version: '1.0.1'
}
<project>
...
<dependencies>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client-shaded</artifactId>
<version>1.0.1</version>
</dependency>
</dependencies>
...
</project>
Every time a PR is merged into the develop
branch, a new snapshot is published.
A snapshot can be included as a normal dependency if the snapshot repository is added to the build file.
IMPORTANT: The snapshot versions are not available for now.
repositories {
jcenter()
mavenCentral()
maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local' }
}
dependencies {
compile group: 'com.hivemq', name: 'hivemq-mqtt-client', version: '1.0.1-SNAPSHOT'
}
<project>
...
<repositories>
<repository>
<id>oss.jfrog.org</id>
<url>https://oss.jfrog.org/artifactory/oss-snapshot-local</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
...
</project>
- API and implementation are clearly separated. All classes inside
internal
packages must not be used directly. - The API is mostly fluent and uses fluent builders to create clients, configurations and messages.
- The API is designed to be consistent:
- The same principles are used throughout the library.
- The MQTT 3 and 5 interfaces are as consistent as possible with only version-specific differences.
Base classes: Mqtt3Client
, Mqtt5Client
Mqtt5Client client = MqttClient.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.useMqttVersion5()
.build();
Mqtt3Client client = MqttClient.builder()...useMqttVersion3().build();
Or if the version is known upfront:
Mqtt5Client client = Mqtt5Client.builder()...build();
Mqtt3Client client = Mqtt3Client.builder()...build();
For each API style exists a specific build...()
method.
Each API style has its own interface to separate them clearly. At any time it is possible to switch the API style.
- Builder method:
buildBlocking()
- Switch method:
client.toBlocking()
final Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildBlocking();
client.connect();
try (final Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
client.subscribeWith().topicFilter("test/topic").qos(MqttQos.AT_LEAST_ONCE).send();
publishes.receive(1, TimeUnit.SECONDS).ifPresent(System.out::println);
publishes.receive(100, TimeUnit.MILLISECONDS).ifPresent(System.out::println);
} finally {
client.disconnect();
}
Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildBlocking();
client.connect();
client.publishWith().topic("test/topic").qos(MqttQos.AT_LEAST_ONCE).payload("1".getBytes()).send();
client.disconnect();
client.connect();
Or with customized properties of the Connect message:
client.connectWith().keepAlive(10).send();
Or with pre-built Connect message:
Mqtt5Connect connectMessage = Mqtt5Connect.builder().keepAlive(10).build();
client.connect(connectMessage);
client.publishWith()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("payload".getBytes())
.send();
Or with pre-built Publish message:
Mqtt5Publish publishMessage = Mqtt5Publish.builder()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("payload".getBytes())
.build();
client.publish(publishMessage);
client.subscribeWith().topicFilter("test/topic").qos(MqttQos.EXACTLY_ONCE).send();
Or with pre-built Subscribe message:
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.build();
client.subscribe(subscribeMessage);
client.unsubscribeWith().topicFilter("test/topic").send();
Or with pre-built Unsubscribe message:
Mqtt5Unsubscribe unsubscribeMessage = Mqtt5Unsubscribe.builder().topicFilter("test/topic").build();
client.unsubscribe(unsubscribeMessage);
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
Mqtt5Publish publishMessage = publishes.receive();
// or with timeout
Optional<Mqtt5Publish> publishMessage = publishes.receive(10, TimeUnit.SECONDS);
// or without blocking
Optional<Mqtt5Publish> publishMessage = publishes.receiveNow();
}
publishes
must be called before subscribe
to ensure no message is lost.
It can be called before connect
to receive messages of a previous session.
client.disconnect();
Or with customized properties of the DISCONNECT message (only MQTT 5):
client.disconnectWith().reasonString("test").send();
Or with pre-built Disconnect message (only MQTT 5):
Mqtt5Disconnect disconnectMessage = Mqtt5Disconnect.builder().disconnectWith().reasonString("test").build();
client.disconnect(disconnectMessage);
client.reauth();
- Builder method:
buildAsync()
- Switch method:
client.toAsync()
Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildBlocking();
client.connect();
client.toAsync().subscribeWith()
.topicFilter("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.callback(System.out::println)
.send();
Mqtt5AsyncClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildAsync();
client.connect()
.thenCompose(connAck -> client.publishWith().topic("test/topic").payload("1".getBytes()).send())
.thenCompose(publish -> client.disconnect());
connect()
, connectWith()
and connect(Mqtt3/5Connect)
method calls are analog to the Blocking API but return
CompletableFuture
.
publishWith()
and publish(Mqtt3/5Publish)
method calls are analog to the Blocking API but return
CompletableFuture
.
subscribeWith()
and subscribe(Mqtt3/5Subscribe)
method calls are analog to the Blocking API but return
CompletableFuture
.
Additionally messages can be consumed per subscribe:
client.subscribeWith()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.callback(System.out::println)
.executor(executor) // optional
.send();
Or with pre-built Subscribe message:
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.build();
client.subscribe(subscribeMessage, System.out::println);
client.subscribe(subscribeMessage, System.out::println, executor);
unsubscribeWith()
and unsubscribe(Mqtt3/5Unsubscribe)
method calls are analog to the Blocking API but return
CompletableFuture
.
Messages can either be consumed per subscribe (described above) or globally:
client.publishes(MqttGlobalPublishFilter.ALL, System.out::println);
Or with executing the callback on a specified executor:
client.publishes(MqttGlobalPublishFilter.ALL, System.out::println, executor);
publishes
must be called before subscribe
to ensure no message is lost.
It can be called before connect
to receive messages of a previous session.
disconnect()
, disconnectWith()
and disconnect(Mqtt5Disconnect)
method calls are analog to the Blocking API but
return CompletableFuture
.
reauth()
method call is analog to the Blocking API but return CompletableFuture
.
- Builder method:
buildRx()
- Switch method:
client.toRx()
Mqtt5RxClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildRx();
// As we use the reactive API, the following line does not connect yet, but returns a reactive type.
// e.g. Single is something like a lazy and reusable future. Think of it as a source for the ConnAck message.
Single<Mqtt5ConnAck> connAckSingle = client.connect();
// Same here: the following line does not subscribe yet, but returns a reactive type.
// FlowableWithSingle is a combination of the single SubAck message and a Flowable of Publish messages.
// A Flowable is an asynchronous stream that enables backpressure from the application over the client to the broker.
FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> subAckAndMatchingPublishes = client.subscribeStreamWith()
.topicFilter("a/b/c").qos(MqttQos.AT_LEAST_ONCE)
.addSubscription().topicFilter("a/b/c/d").qos(MqttQos.EXACTLY_ONCE).applySubscription()
.applySubscribe();
// The reactive types offer many operators that will not be covered here.
// Here we register callbacks to print messages when we received the CONNACK, SUBACK and matching PUBLISH messages.
Completable connectScenario = connAckSingle
.doOnSuccess(connAck -> System.out.println("Connected, " + connAck.getReasonCode()))
.doOnError(throwable -> System.out.println("Connection failed, " + throwable.getMessage()))
.ignoreElement();
Completable subscribeScenario = subAckAndMatchingPublishes
.doOnSingle(subAck -> System.out.println("Subscribed, " + subAck.getReasonCodes()))
.doOnNext(publish -> System.out.println(
"Received publish" + ", topic: " + publish.getTopic() + ", QoS: " + publish.getQos() +
", payload: " + new String(publish.getPayloadAsBytes())))
.ignoreElements();
// Reactive types can be easily and flexibly combined
connectScenario.andThen(subscribeScenario).blockingAwait();
Mqtt5RxClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildRx();
// As we use the reactive API, the following line does not connect yet, but returns a reactive type.
Completable connectScenario = client.connect()
.doOnSuccess(connAck -> System.out.println("Connected, " + connAck.getReasonCode()))
.doOnError(throwable -> System.out.println("Connection failed, " + throwable.getMessage()))
.ignoreElement();
// Fake a stream of Publish messages with an incrementing number in the payload
Flowable<Mqtt5Publish> messagesToPublish = Flowable.range(0, 10_000)
.map(i -> Mqtt5Publish.builder()
.topic("a/b/c")
.qos(MqttQos.AT_LEAST_ONCE)
.payload(("test " + i).getBytes())
.build())
// Emit 1 message only every 100 milliseconds
.zipWith(Flowable.interval(100, TimeUnit.MILLISECONDS), (publish, i) -> publish);
// As we use the reactive API, the following line does not publish yet, but returns a reactive type.
Completable publishScenario = client.publish(messagesToPublish)
.doOnNext(publishResult -> System.out.println(
"Publish acknowledged: " + new String(publishResult.getPublish().getPayloadAsBytes())))
.ignoreElements();
// As we use the reactive API, the following line does not disconnect yet, but returns a reactive type.
Completable disconnectScenario = client.disconnect().doOnComplete(() -> System.out.println("Disconnected"));
// Reactive types can be easily and flexibly combined
connectScenario.andThen(publishScenario).andThen(disconnectScenario).blockingAwait();
connect()
, connectWith()
and connect(Mqtt3/5Connect)
method calls are analog to the Async and Blocking API but
return Single<ConnAck>
.
publish
takes a reactive stream of Publish messages (Flowable
) and returns a reactive stream of Publish results
(Flowable
).
The Reactive API is usually not used for publishing single messages. Nevertheless it is possible with the following code.
Single<Mqtt5PublishResult> result =
client.publish(Flowable.just(Mqtt5Publish.builder()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("payload".getBytes())
.build())).singleOrError();
subscribeWith()
and subscribe(Mqtt3/5Subscribe)
method calls are analog to the Async and Blocking API but return
Single<SubAck>
.
Additionally messages can be consumed per subscribe:
Flowable<Mqtt5Publish> result =
client.subscribeStreamWith()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.applySubscribe()
.doOnSingle(subAck -> System.out.println("subscribed"))
.doOnNext(publish -> System.out.println("received publish"));
Or with pre-built Subscribe message:
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.build();
Flowable<Mqtt5Publish> result =
client.subscribeStreamWith(subscribeMessage)
.doOnSingle(subAck -> System.out.println("subscribed"))
.doOnNext(publish -> System.out.println("received publish"));
unsubscribeWith()
and unsubscribe(Mqtt3/5Unsubscribe)
method calls are analog to the Async and Blocking API but
return Single<UnsubAck>
.
Messages can either be consumed per subscribe (described above) or globally:
Flowable<Mqtt5Publish> result =
client.publishes(MqttGlobalPublishFilter.ALL).doOnNext(System.out::println);
publishes
must be called before subscribe
to ensure no message is lost.
It can be called before connect
to receive messages of a previous session.
disconnect()
, disconnectWith()
and disconnect(Mqtt5Disconnect)
method calls are analog to the Async and Blocking
API but return Completable
.
reauth()
method call is analog to the Async and Blocking API but return Completable
.
Semantic Versioning is used.
All code inside com.hivemq.client.internal
packages must not be used directly. It can change at any time and is not
part of the public API.
See CONTRIBUTING.md