Coder Social home page Coder Social logo

actix-mqtt-client's People

Contributors

jackh726 avatar syndim avatar xudesheng avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

actix-mqtt-client's Issues

could you give the example project?

I try to compile readme sample code.

but, too many confuse type for me.

I don't know the content about cargo.toml & use mio::tcp::TcpStream; ? for the sample.

TlsStream causes "early eof"?

Using native_tls and tokio_native_tls crates in the fashion (example is simplified version of my code):

        let tcp_stream = TcpStream::connect(mqtt_server_sockaddr).await.unwrap();

        let inner_cx = TlsConnector::builder()
                .min_protocol_version(Some(native_tls::Protocol::Tlsv12))
                .build().unwrap();

        let outer_cx = tokio_native_tls::TlsConnector::from(inner_cx);
        let tls_stream = outer_cx.connect(mqtt_server_hostname, tcp_stream).await.unwrap();

        trace!("{:?}", tls_stream);
        let (reader, writer) = split(tls_stream);

        let client = MqttClient::new(
            reader, writer,
            "device_id",
            MqttOptions::default(),
            MessageActor.start().recipient(),
            ErrorActor.start().recipient(),
            None,
        );

        client.connect().await.unwrap();

        // Waiting for the client to be connected
        while client.is_connected().await.unwrap() {
            let delay_time = Instant::now() + Duration::new(1, 0);
            delay_until(delay_time).await;
        }

Always ends up with an error:

2020-08-14T11:25:53.125Z ERROR actix_mqtt_client::actors::actions::recv > Failed to parse packet: early eof

My guess is that splitted TlsStream is different somehow compared to the TcpStream and thus causes read/write issues in reader/writer.

Any ideas on how to properly setup the TLS connectivity if this is my mistake..?

Messages published with quality level 2 are not bradcasted

Hi, I'm trying to integrate this library in a project of mine and I'm having issues with messages published on level 2.

The issue can be replicated with your example code. I'm using a Mosquitto broker in my same subnet. Messages sent on level 0 and level 1 do get sent and broadcasted but messages on level 2 seem to not be broadcasted.

If the client gets immediately disconnected as from te he example, level 2 messages are sent but not broadcasted.
On the other end, if the client is not disconnected, i.e. I replace client.disconnect() with a simple Ok(()), the message is correctly broadcasted.

There might be a race condition between the client disconnection and the completion of the message publication.

Log level adjustment

Hi, @Syndim :
Could you kindly please consider to adjust some log level to debug or trace?
[2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::pingreq] Start to send ping [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::pingreq] Server not connected yet, do nothing. [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::pingreq] Start to send ping [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::pingreq] Server not connected yet, do nothing. [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::publish] Handle message for SendPublishActor [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::publish] Handle message for SendPublishActor [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::publish] Handle message for SendPublishActor [2020-03-09T17:56:46Z INFO actix_mqtt_client::actors::packets::publish] Handle message for SendPublishActor
message like Start to send ping and Handle message for ... should be better at debug or trace level. How do you think about it?

{ kind: Interrupted, error: "Mailbox has closed" }

First step, I added a loop to continuously publish message and it works:
`
let mut total_count = 0;
let mut rng = rand::thread_rng();

            loop {
                total_count += 1;
                if total_count>1000 {break;}

                println!("Publish: {}", total_count);
                let payload = json!({
                    "temperature": 707 + rng.gen_range(0, 70),
                    "humidity": 202 + rng.gen_range(0, 20),
                });

                let payload = serde_json::to_string(&payload).unwrap();
                println!("payload:{}", payload);
                client
                    .publish(
                        //String::from("test"),
                        TOPIC.to_owned(),
                        QualityOfService::Level0,
                        Vec::from(payload.as_bytes()),
                    )
                    .await?;
                println!("Sending");
                println!("Wait for 5s");
                let delay_time = Instant::now() + Duration::new(1, 0);
                delay_until(delay_time).await;
                println!("Sent successfully, delayed 5s");
            }

`

second step is to simulate multiple client by adding a for loop

`
for client in clients {

let future = ...

   let result = async move {

       new mqtt-client

       loop {sending msg}

   }

Arbiter::spawn(future);

}

`

After adding a for loop, all clients will only be able to send out first message and then Actor will stop. Error message is Custom { kind: Interrupted, error: "Mailbox has closed" }. I couldn't figure out why Actor was stopped.

Appreciate your help.

The library from crates.io does not compile.

Hello, i'm a french computer science student that use your library for a personnal project.

By using your library, it seems that the library does not compile. I don't know if you still support this library but i throw a bottle in the ocean !

To help you for debugging, I have the following error :

The error message :

error[E0310]: the parameter type `TStatusPayload` may not live long enough
  --> C:\Users\***\.cargo\registry\src\index.crates.io-6f17d22bba15001f\actix-mqtt-client-0.4.4\src\actors\packets\mod.rs:73:23
   |
73 |     status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
   |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |                       |
   |                       the parameter type `TStatusPayload` must be valid for the static lifetime...
   |                       ...so that the type `TStatusPayload` will meet its required lifetime bounds
   |
help: consider adding an explicit lifetime bound
   |
84 |     TStatusPayload: Send + 'static,
   |                          +++++++++

error[E0310]: the parameter type `TStatusPayload` may not live long enough
   --> C:\Users\***\.cargo\registry\src\index.crates.io-6f17d22bba15001f\actix-mqtt-client-0.4.4\src\actors\packets\mod.rs:128:23
    |
128 |     status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
    |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |                       |
    |                       the parameter type `TStatusPayload` must be valid for the static lifetime...
    |                       ...so that the type `TStatusPayload` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound
    |
139 |     TStatusPayload: Send + 'static,
    |                          +++++++++

error[E0310]: the parameter type `TStatusPayload` may not live long enough
   --> C:\Users\***\.cargo\registry\src\index.crates.io-6f17d22bba15001f\actix-mqtt-client-0.4.4\src\actors\packets\mod.rs:153:23
    |
153 |     status_recipient: &Recipient<StatusOperationMessage<TStatusPayload>>,
    |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |                       |
    |                       the parameter type `TStatusPayload` must be valid for the static lifetime...
    |                       ...so that the type `TStatusPayload` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound
    |
164 |     TStatusPayload: Send + 'static,
    |                          +++++++++

For more information about this error, try `rustc --explain E0310`.
error: could not compile `actix-mqtt-client` (lib) due to 3 previous errors

My code :

Even if I think that my code is not the problem (which is basically the code that is give in the documention), here is my code.

use std::io::Error as IoError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use actix::{Actor, Arbiter, System};
use env_logger;
use tokio::io::split;
use tokio::net::TcpStream;
use tokio::time::{sleep_until, Instant};
use actix_mqtt_client;

/*----------------------------------------------------- Actor ----------------------------------------------------- */

pub struct ErrorActor;

impl actix::Actor for ErrorActor {
    type Context = actix::Context<Self>;
}

impl actix::Handler<ErrorMessage> for ErrorActor {
    type Result = ();
    fn handle(&mut self, error: ErrorMessage, _: &mut Self::Context) -> Self::Result {
        log::error!("{}", error.0);
    }
}

pub struct MessageActor;

impl actix::Actor for MessageActor {
    type Context = actix::Context<Self>;
}

impl actix::Handler<PublishMessage> for MessageActor {
    type Result = ();
    fn handle(
        &mut self,
        msg: PublishMessage,
        _: &mut Self::Context,
    ) -> Self::Result {
        log::info!(
            "Got message: id:{}, topic: {}, payload: {:?}",
            msg.id,
            msg.topic_name,
            msg.payload
        );
    }
}

/*----------------------------------------------------- Main ----------------------------------------------------- */

#[actix::main]
fn main() {
    let sys = System::new();
    let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
    sys.block_on(async move {
        let result = async move {
            let stream = TcpStream::connect(socket_addr).await?;
            let (r, w) = split(stream);
            log::info!("TCP connected");
            let mut client = MqttClient::new(
                r,
                w,
                String::from("test"),
                MqttOptions::default(),
                MessageActor.start().recipient(),
                ErrorActor.start().recipient(),
                None,
            );
            client.connect().await?;

            // Waiting for the client to be connected
            while !client.is_connected().await? {
                let delay_time = Instant::now() + Duration::new(1, 0);
                sleep_until(delay_time).await;
            }

            log::info!("MQTT connected");
            log::info!("Subscribe");
            client
                .subscribe(String::from("test"), mqtt::QualityOfService::Level2)
                .await?;
            log::info!("Publish");
            client
                .publish(
                    String::from("test"),
                    mqtt::QualityOfService::Level0,
                    Vec::from("test".as_bytes()),
                )
                .await?;
            log::info!("Wait for 10s");
            let delay_time = Instant::now() + Duration::new(10, 0);
            sleep_until(delay_time).await;
            client
                .publish(
                    String::from("test"),
                    mqtt::QualityOfService::Level1,
                    Vec::from("test2".as_bytes()),
                )
                .await?;
            log::info!("Wait for 10s");
            let delay_time = Instant::now() + Duration::new(10, 0);
            sleep_until(delay_time).await;
            client
                .publish(
                    String::from("test"),
                    mqtt::QualityOfService::Level2,
                    Vec::from("test3".as_bytes()),
                )
                .await?;
            log::info!("Wait for 10s");
            let delay_time = Instant::now() + Duration::new(10, 0);
            sleep_until(delay_time).await;
            log::info!("Disconnect");
            client.disconnect(false).await?;
            log::info!("Check if disconnect is successful");
            Ok(assert_eq!(true, client.is_disconnected())) as Result<(), IoError>
        }
        .await;
        result.unwrap()
    });
    sys.run().unwrap();
}

support tls connection on port 8883

I'm trying to connect to Azure IoTHub, which is on port 8883 and tls is enforced.

I didn't find any option to set up CA cert for tls communication.

Could you kindly please show me a quick example?

thanks.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.