syndim / actix-mqtt-client Goto Github PK
View Code? Open in Web Editor NEWA rust MQTT v3.1.1 client based on the actix framework
License: MIT License
A rust MQTT v3.1.1 client based on the actix framework
License: MIT License
I try to compile readme sample code.
but, too many confuse type for me.
I don't know the content about cargo.toml & use mio::tcp::TcpStream; ? for the sample.
Hi, @Syndim ๐
so far, this lib works great. really appreciate.
do you have plan to support MQTT over websocket?
Using native_tls and tokio_native_tls crates in the fashion (example is simplified version of my code):
let tcp_stream = TcpStream::connect(mqtt_server_sockaddr).await.unwrap();
let inner_cx = TlsConnector::builder()
.min_protocol_version(Some(native_tls::Protocol::Tlsv12))
.build().unwrap();
let outer_cx = tokio_native_tls::TlsConnector::from(inner_cx);
let tls_stream = outer_cx.connect(mqtt_server_hostname, tcp_stream).await.unwrap();
trace!("{:?}", tls_stream);
let (reader, writer) = split(tls_stream);
let client = MqttClient::new(
reader, writer,
"device_id",
MqttOptions::default(),
MessageActor.start().recipient(),
ErrorActor.start().recipient(),
None,
);
client.connect().await.unwrap();
// Waiting for the client to be connected
while client.is_connected().await.unwrap() {
let delay_time = Instant::now() + Duration::new(1, 0);
delay_until(delay_time).await;
}
Always ends up with an error:
2020-08-14T11:25:53.125Z ERROR actix_mqtt_client::actors::actions::recv > Failed to parse packet: early eof
My guess is that splitted TlsStream is different somehow compared to the TcpStream and thus causes read/write issues in reader/writer.
Any ideas on how to properly setup the TLS connectivity if this is my mistake..?
Hi, I'm trying to integrate this library in a project of mine and I'm having issues with messages published on level 2.
The issue can be replicated with your example code. I'm using a Mosquitto broker in my same subnet. Messages sent on level 0 and level 1 do get sent and broadcasted but messages on level 2 seem to not be broadcasted.
If the client gets immediately disconnected as from te he example, level 2 messages are sent but not broadcasted.
On the other end, if the client is not disconnected, i.e. I replace client.disconnect()
with a simple Ok(())
, the message is correctly broadcasted.
There might be a race condition between the client disconnection and the completion of the message publication.
Hi, @Syndim :
Could you kindly please consider to adjust some log level to debug or trace?
[2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::pingreq] Start to send ping [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::pingreq] Server not connected yet, do nothing. [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::pingreq] Start to send ping [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::pingreq] Server not connected yet, do nothing. [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::publish] Handle message for SendPublishActor [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::publish] Handle message for SendPublishActor [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::publish] Handle message for SendPublishActor [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::publish] Handle message for SendPublishActor
message like Start to send ping
and Handle message for ...
should be better at debug or trace level. How do you think about it?
First step, I added a loop to continuously publish message and it works:
`
let mut total_count = 0;
let mut rng = rand::thread_rng();
loop {
total_count += 1;
if total_count>1000 {break;}
println!("Publish: {}", total_count);
let payload = json!({
"temperature": 707 + rng.gen_range(0, 70),
"humidity": 202 + rng.gen_range(0, 20),
});
let payload = serde_json::to_string(&payload).unwrap();
println!("payload:{}", payload);
client
.publish(
//String::from("test"),
TOPIC.to_owned(),
QualityOfService::Level0,
Vec::from(payload.as_bytes()),
)
.await?;
println!("Sending");
println!("Wait for 5s");
let delay_time = Instant::now() + Duration::new(1, 0);
delay_until(delay_time).await;
println!("Sent successfully, delayed 5s");
}
`
second step is to simulate multiple client by adding a for loop
`
for client in clients {
let future = ...
let result = async move {
new mqtt-client
loop {sending msg}
}
Arbiter::spawn(future);
}
`
After adding a for loop, all clients will only be able to send out first message and then Actor will stop. Error message is Custom { kind: Interrupted, error: "Mailbox has closed" }
. I couldn't figure out why Actor was stopped.
Appreciate your help.
Hello, i'm a french computer science student that use your library for a personnal project.
By using your library, it seems that the library does not compile. I don't know if you still support this library but i throw a bottle in the ocean !
To help you for debugging, I have the following error :
error[E0310]: the parameter type `TStatusPayload` may not live long enough
--> C:\Users\***\.cargo\registry\src\index.crates.io-6f17d22bba15001f\actix-mqtt-client-0.4.4\src\actors\packets\mod.rs:73:23
|
73 | status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| |
| the parameter type `TStatusPayload` must be valid for the static lifetime...
| ...so that the type `TStatusPayload` will meet its required lifetime bounds
|
help: consider adding an explicit lifetime bound
|
84 | TStatusPayload: Send + 'static,
| +++++++++
error[E0310]: the parameter type `TStatusPayload` may not live long enough
--> C:\Users\***\.cargo\registry\src\index.crates.io-6f17d22bba15001f\actix-mqtt-client-0.4.4\src\actors\packets\mod.rs:128:23
|
128 | status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| |
| the parameter type `TStatusPayload` must be valid for the static lifetime...
| ...so that the type `TStatusPayload` will meet its required lifetime bounds
|
help: consider adding an explicit lifetime bound
|
139 | TStatusPayload: Send + 'static,
| +++++++++
error[E0310]: the parameter type `TStatusPayload` may not live long enough
--> C:\Users\***\.cargo\registry\src\index.crates.io-6f17d22bba15001f\actix-mqtt-client-0.4.4\src\actors\packets\mod.rs:153:23
|
153 | status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| |
| the parameter type `TStatusPayload` must be valid for the static lifetime...
| ...so that the type `TStatusPayload` will meet its required lifetime bounds
|
help: consider adding an explicit lifetime bound
|
164 | TStatusPayload: Send + 'static,
| +++++++++
For more information about this error, try `rustc --explain E0310`.
error: could not compile `actix-mqtt-client` (lib) due to 3 previous errors
Even if I think that my code is not the problem (which is basically the code that is give in the documention), here is my code.
use std::io::Error as IoError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use actix::{Actor, Arbiter, System};
use env_logger;
use tokio::io::split;
use tokio::net::TcpStream;
use tokio::time::{sleep_until, Instant};
use actix_mqtt_client;
/*----------------------------------------------------- Actor ----------------------------------------------------- */
pub struct ErrorActor;
impl actix::Actor for ErrorActor {
type Context = actix::Context<Self>;
}
impl actix::Handler<ErrorMessage> for ErrorActor {
type Result = ();
fn handle(&mut self, error: ErrorMessage, _: &mut Self::Context) -> Self::Result {
log::error!("{}", error.0);
}
}
pub struct MessageActor;
impl actix::Actor for MessageActor {
type Context = actix::Context<Self>;
}
impl actix::Handler<PublishMessage> for MessageActor {
type Result = ();
fn handle(
&mut self,
msg: PublishMessage,
_: &mut Self::Context,
) -> Self::Result {
log::info!(
"Got message: id:{}, topic: {}, payload: {:?}",
msg.id,
msg.topic_name,
msg.payload
);
}
}
/*----------------------------------------------------- Main ----------------------------------------------------- */
#[actix::main]
fn main() {
let sys = System::new();
let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
sys.block_on(async move {
let result = async move {
let stream = TcpStream::connect(socket_addr).await?;
let (r, w) = split(stream);
log::info!("TCP connected");
let mut client = MqttClient::new(
r,
w,
String::from("test"),
MqttOptions::default(),
MessageActor.start().recipient(),
ErrorActor.start().recipient(),
None,
);
client.connect().await?;
// Waiting for the client to be connected
while !client.is_connected().await? {
let delay_time = Instant::now() + Duration::new(1, 0);
sleep_until(delay_time).await;
}
log::info!("MQTT connected");
log::info!("Subscribe");
client
.subscribe(String::from("test"), mqtt::QualityOfService::Level2)
.await?;
log::info!("Publish");
client
.publish(
String::from("test"),
mqtt::QualityOfService::Level0,
Vec::from("test".as_bytes()),
)
.await?;
log::info!("Wait for 10s");
let delay_time = Instant::now() + Duration::new(10, 0);
sleep_until(delay_time).await;
client
.publish(
String::from("test"),
mqtt::QualityOfService::Level1,
Vec::from("test2".as_bytes()),
)
.await?;
log::info!("Wait for 10s");
let delay_time = Instant::now() + Duration::new(10, 0);
sleep_until(delay_time).await;
client
.publish(
String::from("test"),
mqtt::QualityOfService::Level2,
Vec::from("test3".as_bytes()),
)
.await?;
log::info!("Wait for 10s");
let delay_time = Instant::now() + Duration::new(10, 0);
sleep_until(delay_time).await;
log::info!("Disconnect");
client.disconnect(false).await?;
log::info!("Check if disconnect is successful");
Ok(assert_eq!(true, client.is_disconnected())) as Result<(), IoError>
}
.await;
result.unwrap()
});
sys.run().unwrap();
}
any plan to support tokio 1.6 and actix 0.11?
thanks.
I'm trying to connect to Azure IoTHub, which is on port 8883 and tls is enforced.
I didn't find any option to set up CA cert for tls communication.
Could you kindly please show me a quick example?
thanks.
Why it's important: https://lucumr.pocoo.org/2020/1/1/async-pressure/
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.