Coder Social home page Coder Social logo

reqwest-eventsource's Introduction

reqwest-eventsource

Provides a simple wrapper for [reqwest] to provide an Event Source implementation. You can learn more about Server Sent Events (SSE) take a look at the MDN docs This crate uses [eventsource_stream] to wrap the underlying Bytes stream, and retries failed requests.

Example

let mut es = EventSource::get("http://localhost:8000/events");
while let Some(event) = es.next().await {
    match event {
        Ok(Event::Open) => println!("Connection Open!"),
        Ok(Event::Message(message)) => println!("Message: {:#?}", message),
        Err(err) => {
            println!("Error: {}", err);
            es.close();
        }
    }
}

License: MIT OR Apache-2.0

reqwest-eventsource's People

Contributors

fenhl avatar jpopesculian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

reqwest-eventsource's Issues

Tokio 1.0 upgrade

Hi, do you plan to merge current active pull request?
Also, I failed to make the client work with warp framework based SSE server, have you guys tested the client?

Non-200 responses should throw errors

According to the spec, the event stream should be parsed only if the status code of the response is 200. That's currently not implemented.

Regarding other codes, 301 and 307 redirections must be followed (already handled properly). 204 is a hint telling the client to not reconnect (not handled). Any other status code must throw an error.

I fear that a new API is required to handle this properly (#6 could probably be fixed by this redesign too). If you want @jpopesculian, I can try to work on a patch.

Why is StreamEnded an Error condition

Poll::Ready(Some(Err(err))) => { <-- these are legit errors
let err = err.into();
this.handle_error(&err);
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Some(Ok(event))) => {
this.handle_event(&event);
Poll::Ready(Some(Ok(event.into())))
}
Poll::Ready(None) => {
let err = Error::StreamEnded; <-- wouldn't it be more appropriate to have something like an Event::Done?
this.handle_error(&err);
Poll::Ready(Some(Err(err)))
}
Poll::Pending => Poll::Pending,
}

actix-web-lab SSE integration

I am struggling to integrate this crate with actix-web-lab SSE. The event source returns a StreamEnded error after draining the source of all events that were created before the event source made its initial connection request (then subsequent sends by the server will eventually fail because the channel is closed).

Here is a hello world test that fails. Any help would be appreciated.

[dependencies]
actix-rt            = { version = "2.8.0" }
actix-web           = { version = "4.3.1" }
actix-web-lab       = { version = "0.19.1" }
reqwest             = { version = "0.11.18", features = [ "blocking", "json" ] }
reqwest-eventsource = { version = "0.4.0" }
serde               = { version = "1.0.160" }
serde_json          = { version = "1.0.100" }
futures             = { version = "0.3.0" }
//local shortcuts

//third-party shortcuts
use serde::{Serialize, Deserialize};

//standard shortcuts
use futures::stream::StreamExt;
use std::collections::HashSet;
use std::sync::RwLock;
use std::time::Duration;

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
// SERVER
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

/// stream event
#[derive(Serialize, Deserialize)]
struct StreamEvent(u64);

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

/// stream sender (stored in server)
#[derive(Default)]
struct StreamSender
{
    sender: RwLock<Option<actix_web_lab::sse::Sender>>
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

#[actix_web::get("/stream/connect")]
async fn server_event_stream_connector(stream_sender: actix_web::web::Data<StreamSender>) -> impl actix_web::Responder
{
    let (sender, sse_stream) = actix_web_lab::sse::channel(10);
    stream_sender.sender.write().unwrap().replace(sender.clone());
    dbg!("stream initialized");
    return sse_stream;//.with_keep_alive(Duration::from_secs(5));
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

#[actix_web::post("/stream/instigate")]
async fn server_event_stream_instigator(
    stream_sender : actix_web::web::Data<StreamSender>,
    event_value   : actix_web::web::Json<StreamEvent>
) -> impl actix_web::Responder
{
    dbg!("stream instigation received");
    let mut sender_guard = stream_sender.sender.write().unwrap();
    let Some(sender) = sender_guard.as_mut()
    else { panic!("stream sender should be initialized before being instigated in test"); };

    sender.send(actix_web_lab::sse::Data::new_json(event_value.0).unwrap()).await.unwrap();

    actix_web::HttpResponse::Ok()
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

#[actix_web::delete("/stream/close")]
async fn server_event_stream_closer(stream_sender: actix_web::web::Data<StreamSender>) -> impl actix_web::Responder
{
    dbg!("stream closed");
    stream_sender.sender.write().unwrap().take();
    actix_web::HttpResponse::Ok()
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

/// run http server
fn http_server_runner() -> std::io::Result<()>
{
    let stream_sender = actix_web::web::Data::new(StreamSender::default());

    // set up the tokio runtime and run the server inside it
    actix_web::rt::System::new().block_on(
            actix_web::HttpServer::new(
                move ||
                {
                    actix_web::App::new()
                        .app_data(stream_sender.clone())
                        .service(server_event_stream_connector)
                        .service(server_event_stream_instigator)
                        .service(server_event_stream_closer)
                }
            )
            .bind(("127.0.0.1", 7000))?
            .run()
        )
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
// CLIENT
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

fn make_http_client() -> reqwest::blocking::Client
{
    reqwest::blocking::Client::new()
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

fn make_http_event_stream() -> reqwest_eventsource::EventSource
{
    // make stream
    let mut stream = reqwest_eventsource::EventSource::get("http://127.0.0.1:7000/stream/connect");

    // wait until stream is open
    dbg!(stream.ready_state());
    let first_event = actix_web::rt::System::new().block_on(stream.next()).expect("stream should receive at least one event");
    dbg!(stream.ready_state());
    match first_event
    {
        Ok(reqwest_eventsource::Event::Open) => (),
        _ => panic!("first stream event was not 'stream open'")
    }

    stream
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

fn client_instigate_stream_event(client: &reqwest::blocking::Client, event: StreamEvent)
{
    let _ = client
        .post("http://127.0.0.1:7000/stream/instigate")
        .json(&event)
        .send()
        .expect("sending instigate stream event should not fail in test");
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

fn client_request_stream_closure(client: &reqwest::blocking::Client)
{
    let _ = client.delete("http://127.0.0.1:7000/stream/close").send().expect("sending close stream should not fail in test");
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

/// collect streamed events from http server
async fn collect_streamed_events(mut stream: reqwest_eventsource::EventSource) -> HashSet<u64>
{
    dbg!(stream.ready_state());
    let mut result = HashSet::<u64>::default();

    while let Some(event) = stream.next().await
    {
        dbg!(stream.ready_state());
        match event
        {
            Ok(reqwest_eventsource::Event::Open) => { dbg!("event: stream open"); },
            Ok(reqwest_eventsource::Event::Message(message)) =>
            {
                dbg!("event: event message");
                let stream_event: StreamEvent = serde_json::from_slice(message.data.as_bytes()).unwrap();
                result.insert(stream_event.0);
            }
            // we must manually close the client when the server ends the stream
            Err(reqwest_eventsource::Error::StreamEnded) =>
            {
                dbg!("event: stream ended");
                stream.close()
            }
            Err(_) => panic!("event stream should not error in test (other than stream end)")
        }
    }

    result
}

//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
// TEST
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------

#[test]
fn sse_hello_world()
{
    // launch http server in new thread
    let _ = std::thread::spawn(|| { let _ = http_server_runner(); });

    // make client
    let client = make_http_client();

    // prepare stream between client and server
    let stream = make_http_event_stream();

    // begin collecting future stream of collected events
    let stream_result_joiner = std::thread::spawn(
            move ||
            {
                actix_web::rt::System::new().block_on(
                        collect_streamed_events(stream)
                    )
            }
        );

    // tell server to feed the stream
    client_instigate_stream_event(&client, StreamEvent(10));
    client_instigate_stream_event(&client, StreamEvent(200));
    client_instigate_stream_event(&client, StreamEvent(4));

    // tell server to close the stream
    client_request_stream_closure(&client);

    // collect stream results
    //let stream_result = actix_web::rt::System::new().block_on(stream_result_future);
    let stream_result = stream_result_joiner.join().expect("stream result should appear in test");

    // check result
    assert!(stream_result.contains(&10));
    assert!(stream_result.contains(&200));
    assert!(stream_result.contains(&4));
}

//-------------------------------------------------------------------------------------------------------------------

Connection lost

If the server is shutdown after the connection has been made, there is no way of knowing it on the client side.
Or at least I did not find it.

Closing Connection

I couldn't find a way to stop the client from listening or putting it back into auto-retry mode.

eventsource_stream 404

Hello. You removed the crate eventsource_stream from GitHub, won't you stick with it anymore?

New message ready callback

It would be nice to have a way to use callbacks instead of using .await.
Something like this:

stream.on_message_ready(|event| {
    //called each time a new Poll<...> is ready
    [...]
});

It would not block the thread.
I did not find how to do it the way it currently is.

What error conditions should be handled?

Thanks for providing this project.

I'm curious as to what causes an error to occur when consuming the event source stream and what must therefore should be handled by the application. I've found that browsers handle some errors and retry, but if you get a bad gateway response then the event source errors with them. Is there similar behaviour here?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.