Coder Social home page Coder Social logo

vertx-rx's Introduction

Rx extension for Vert.x

Build Status (5.x) Build Status (4.x)

Vert.x module adding support for Reactive Extensions (Rx) using the Rx libraries. This allows Vert.x developers to use the Rx type-safe composable API to build Vert.x verticles. This module provides helpers for adapting Vert.x stream and future constructs to Rx observables.

Stack integration

RxJava, RxJava2 and RxJava3 are integrated with the Vert.x stack.

Integration tests

Integration tests are placed here:

Integration docs

Integration docs are placed here:

The corresponding component uses an asciidoctor include directive to embed the doc snippet:

Vert.x Web Client example:

is replaced by

The final aggregation happens in the stack generation module.

vertx-rx's People

Contributors

aaloise avatar aguibert avatar anguisa avatar billyyccc avatar bjartek avatar bmsantos avatar cescoffier avatar cvgaviao avatar dano avatar emadalblueshi avatar flukschander avatar fyro-ing avatar ismail2ov avatar jponge avatar julianladisch avatar kevinjcross avatar leibnizhu avatar meshuga avatar nilsrenaud avatar nscavell avatar okou19900722 avatar pk-work avatar pmlopes avatar purplefox avatar rgmz avatar ruslansennov avatar sammers21 avatar slinkydeveloper avatar tsegismont avatar vietj avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

vertx-rx's Issues

Exceptions within a subscriber stops a consumer

Following code demonstrates the problem. As can be seen if the subscriber throws an error the consumer is killed. This was unexpected for me as I expected consumers to remain subscribed even though a single invocation may throw an exception.

        new EventBus(eventBus)
                .consumer("foo")
                .toObservable()
                .subscribe(msg -> {
                    System.out.println("Seen: " + msg.body());
                    if (msg.body().equals(2)) {
                        throw new RuntimeException("error");
                    }
                });

        eventBus.send("foo", 1);
        eventBus.send("foo", 2);
        eventBus.send("foo", 3);

Missing calls to RxJavaSchedulersHook.onSchedule when using RxHelper.schedulerHook

Hi vertx-rx team!

In plain rxjava RxJavaSchedulersHook.onSchedule gets called by ScheduledAction.scheduleActual(...). This hook is important, to allow for applications to transfer ThreadLocal values from the scheduling thread to the executing thread.

When using RxHelper.schedulerHook (to have Scheduler.io() operations run in a vert.x worker thread) the calls to the RxJavaSchedulersHook.onSchedule method never happen. I assume, those need to be added to the ContextScheduler.WorkerImpl.schedule(...) methods.

Best regards,
Timo

Unnecessary wrapping of handlers while invoking setters on delegate

Issue was originally created at: eclipse-vertx/vert.x#1463 - Moving to vertx-rx repository.

While consuming vertx-rx-java, I see that handlers set on delegate are wrapped objects of input parameters. This seems to pose two issues -

  1. The input parameters in rxjava as well as core packages are of io.vertx.core.Handler type. Still, in rxjava package, we wrap it to the same type through constructor before passing it to delegate. This creates unnecessary objects.
  2. If null is passed as a parameter to rxjava package, it results in NPE in core package.

Example:
io.vertx.rxjava.core.http.HttpServerResponse#drainHandler, closeHandler


  public HttpServerResponse drainHandler(Handler<Void> handler) { 
    ((io.vertx.core.http.HttpServerResponse) delegate).drainHandler(new Handler<java.lang.Void>() {
      public void handle(java.lang.Void event) {
        handler.handle(event);
      }
    });
    return this;
  }

My use case expects to wait for some data to arrive from an external source and write it to client. I register to drain handler when I have excess amount of data and write queue gets full. When I do not have data to write (as I wait for my source to send some), drain events make no sense and I unregister by setting drain handler to null. Though my overall functionality is not impacted, there are NPEs thrown from vertx and additional objects are created.

Current Rx api for HttpClient is not composable with idiomatic RxJava

It's a common practice to strive for composable RxJava operator chains that might look something like this:

void someFunction(JsonObject theData) {
   Observable.just(theData)
      .flatMap( json -> getSomeObject( json )  )
      .flatMap( someObject -> transformSomeObject(someObject) )
      .subscribe (
         transformed -> { handleTransformedObject( transformed ) },
         error -> { handleError(error) }
      );
}

Observable<SomeClass> getSomeObject(JsonObject theData) {
   return Observable.just( theData )
          .flatMap( theData -> { /* issue http request, emit response observable */ } )
          .flatMap( httpResponse -> { /* emit response body observable */ } )
          .flatMap( responseBody -> { /* emit SomeClass observable */ } );
}

...other functions

Calls to getSomeObject() could then be used in any chain where retrieval of a SomeClass observable is needed, given an input JsonObject, and the chain author does not need to know anything about how the instance of SomeClass was obtained.

The implementation of getSomeObject above can't be realized with the current Rx api for the HttpClient, because it requires a call to HttpRequest.end() to be placed outside the operator chain, after the call to subscribe.

This issue is to request that a more RX-idiomatic api for the HttpClient be considered so it can fit more naturally into RxJava-oriented applications.

Doc has a typo

In the WebSocket session, the website has a bad rendering:

« The {@link io.vertx.rxjava.core.http.HttpServer#websocketStream()` provides a callback for each incoming connection: »

ObservableHandler can miss events depending on ordering

Vertx 3.3.3

A good example of this is doing a get from the Hazelcast version of MultiMap (HazelcastAsyncMultiMap). Let's say we have something like this (subs is an AsyncMultiMap):

            def observeGet = RxHelper.<AsyncResult<ChoosableIterable<ServerID>>>observableHandler()
            return observeGet.map {
                if (it.failed()) {
                    throw it.cause()
                }
                return new MapEntry(address, Lists.newArrayList(it.result()))
            }.doOnSubscribe() {
                subs.get(address, observeGet.toHandler())
            }

When the observable returned here is subscribed to (this happens be from inside a flatMap call, but you could create and subscribe to it directly) we'll call get. If the get request is fulfilled from the cache (line 92 of HazelcastAsyncMultiMap) then we immediately call handle on ObservableHandler. Since we aren't yet fully subscribed (the subscribe call on the handler doesn't happen until after the doOnSubscribe callback. We end up discarding the event. Then when the subscribe call occurs, we simply call fireComplete and the event emission is lost.

There doesn't appear to be a way to guarantee that the called to handle won't happen before the call to subscribe in this case. I've even tried the experiemental doOnRequest and that's not late enough either. About the only choice is to put the call into some kind of deferred async request and even that's not 100% gauranteed to work. It would be better if in this case ObservableHandler kept track of the early event and emitted it at the point of subscription. I'm planning to add that code in our environment and I would be happy to provide it back if that helps.

RxJava delegate

Consider using the raw type for the delegate instance of Object (using @{type.raw.name}) .

Observable<HttpClientResponse> should end() on subscribe()

Hello vert-rx,

a code sample in the official examples repo is as follows:

    public void httpClientRequest(Vertx vertx) {
        HttpClient client = vertx.createHttpClient(new HttpClientOptions());
        HttpClientRequest request = client.request(HttpMethod.GET, 8080, "localhost", "/the_uri");
        request.toObservable().subscribe(response -> {
                // Process the response
        }, error -> {
                // Could not connect
        });
        request.end();
    }

In the sample, the subscribe() on the observable is analogous to the handler(...) in the non-rx world. However, in my opinion, the subscribe itself should trigger the end() by itself. This was the case with mod-rxvertx in Vert.x 2.

My usecase (simplified) is/was a function as follows:

public Observable<JsonObject> requestJsonResource(HttpClient client, String url) {
    return client.get(url).toObservable().someMapReduceMagicToTransformResponseToJson(...);
}

The caller of the function then called subscribe() and got an Observable of a JsonObject. Now he would have to end the request, but as the request is managed under the hood, this is not possible.

I then tried doing the following

    ...
    HttpClientRequest request = client.get(url);
    return request.toObservable().doOnSubscribe(request::end).someTransformation(...)

But as the doOnSubscribe triggers before the subscription happens, I get an exception that no handlers for the request which is about to being send is registered.

I understand use cases, where you want to end() the request separately, but in my optionion either the doOnSubscribe variant should work, or a subscribe() should trigger the end() automatically, like with Vert.x 2's mod-rxvertx.

missing deployment methods on io.vertx.rxjava.core.Vertx

Some important methods on the Vertx object are gone:

public Observable<String> deployVerticleObservable(String name, DeploymentOptions options)
used to be present in earlier versions. Same for the undeploy-variant of this method.

The current workaround involves getting a reference to the Vertx-delegate and use void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler); with an ObservableFuture as the third parameter.

These workaround gymnastics are painful.

Generator issue on Handler of VertxGen

Let's take this method:

Handler<ServerWebSocket> webSocketHandler();

This breaks the compilation as the generated RX-ified version of the class containing this issue is not properly generated:

 /Users/clement/Projects/vert.x/vertx-stomp/src/main/generated/io/vertx/rxjava/ext/stomp/StompServer.java:[313,66] incompatible types: io.vertx.core.Handler<io.vertx.core.http.ServerWebSocket> cannot be converted to io.vertx.core.Handler<io.vertx.rxjava.core.http.ServerWebSocket>

Provide CharSequence API for RoutingContext headers

In RxJava RoutingContext object there is no function that accepts CharSequence objects.
For example, on the normal version for vertx-web:

routingContext.request().getHeader(HttpHeaders.AUTHORIZATION); 

Would work, while in the rx version:

routingContext.request().getHeader(HttpHeaders.AUTHORIZATION.toString()); 

The same issue is true for MultiMap#add.

RxHelper.schedulerHook doesn't respect verticle contexts

While running an application that uses a complex combination of delaySuscriptions and buffers, we started getting warnings about HttpClient being probably shared between verticles and some random concurrent modification exceptions that should never happen.

As it turns out, rx observables that were using a specific scheduler where being run ina different event loop than the expected.

Here's a simple reproducer:

public class Test extends AbstractVerticle {
    private final String name;

    public Test(final String name) {
        super();
        this.name = name;
    }

    @SuppressWarnings("uncommentedmain")
    public static void main(final String[] args) throws Throwable {
        final Vertx vertx = Vertx.vertx();
        final RxJavaSchedulersHook hook = RxHelper.schedulerHook(vertx);
        RxJavaPlugins.getInstance().registerSchedulersHook(hook);

        vertx.deployVerticle(new Test("a"));
        vertx.deployVerticle(new Test("b"));

        Thread.currentThread().sleep(1000);

        vertx.close();
    }

    @Override
    public void start() throws Exception {
        log("Verticle on thread %s", Thread.currentThread().getName());
        vertx.setTimer(100, this::timer);
    }

    private void timer(final long id) {
        log("Timer exec on thread %s", Thread.currentThread().getName());
        Observable.just("1", "2", "3").delaySubscription(20, TimeUnit.MILLISECONDS)
            .subscribe(item -> log("iteration %s on thread %s", item, Thread.currentThread().getName()));
    }

    private void log(final String fmt, final Object... args) {
        System.out.println("[" + name + "] " + String.format(fmt, args));
    }
}

If you run it, it produces something along this lines

[a] Verticle on thread vert.x-eventloop-thread-1
[b] Verticle on thread vert.x-eventloop-thread-3
[a] Timer exec on thread vert.x-eventloop-thread-1
[b] Timer exec on thread vert.x-eventloop-thread-3
[a] iteration 1 on thread vert.x-eventloop-thread-1
[a] iteration 2 on thread vert.x-eventloop-thread-1
[a] iteration 3 on thread vert.x-eventloop-thread-1
[b] iteration 1 on thread vert.x-eventloop-thread-1
[b] iteration 2 on thread vert.x-eventloop-thread-1
[b] iteration 3 on thread vert.x-eventloop-thread-1

Check that while we run each verticle on a different event-loop thread, the suscription is run on the same eventloop.

The problem seems to be on io.vertx.rx.java.ContextScheduler.TimedAction.execute:

      private void execute(Object o) {
        if (blocking) {
          vertx.executeBlocking(this::run, NOOP);
        } else {
          context.runOnContext(this::run);
        }
      }

As the context is determined when the scheduler is created, it is fixed, and RxHelper.schedulerHook(Vertx) creates the schedulers once and uses them everywhere.

Changing the line context.runOnContext(this::run); to vertx.runOnContext(this::run); fixed my use case, but I'm not sure if there are other cases that break, since the this is the only place where the context field is used.

Unmarshal Jackson TypeRef

Rest calls can return arrays and for such cases it is useful to be able to unmarshal Jackson TypeReferences.

Rxified buffers should implements ClusterSerializable

You can store a core Buffer in core LocalMap though.

Sample code:

    LocalMap<Long, Buffer> covers = rc.vertx().sharedData().getLocalMap("covers");
    Buffer cached = covers.get(albumId);
    if (cached != null) {
      rc.response().end(cached);
      return;
    }

    download(rc, albumId)
      .doOnSuccess(buffer -> covers.put(albumId, buffer))
      .subscribe(rc.response()::end, rc::fail);

Variant of RxHelper.unmarhsaller that can use a custom mapper

We use jackson-dataformat-yaml in order to unmarshall YAML content to a POJO. It would be very nice if the unmarshaller had a variant that allowed for sending in a mapper, and delegate that mapper to the relevant operator.

I can give a stab at implementing this and sending a PR if that is ok. It should really not be that hard.

Passing null instead of parameter

When a method passes a Class parameter, the RX generated class passes null to the delegated method:

If you have this method:

<T> T dumb(Class<T> tClass);

It generates:

public <T> T dumb(Class<T> tClass) { 
    T ret = (T) delegate.dumb(null);
    return ret;
  }

As you can imagine it's a bit annoying...

The culprit is: vertx-rxjava/template/common.templ line 125:

...
    return 'null';
}

Instead of

    return expr;
}

This bug is blocking vert-x3/vertx-service-discovery#14.

Premature disposal of HTTP client response handlers

There's a bug in vert.x RxJava integration which hangs the requests from being processed. It happens when both io.vertx.rxjava.core.http.HttpClientRequest#toObservable() and io.vertx.rxjava.core.http.HttpClientResponse#toObservable() are being used.

When a response is processed, io.vertx.core.http.impl.HttpClientRequestImpl#handleResponse(HttpClientResponseImpl) is used to call two handlers:

  • respHandler which sets up HttpClientResponseImpl#handler and HttpClientResponseImpl#endHandler through ReadStreamAdapter#onSubscribed() method
  • endHandler which removes HttpClientResponseImpl#handler and HttpClientResponseImpl#endHandler through ReadStreamAdapter#onUnsubscribed() method

ReadStreamAdapter described above have adapter property value set to an instance of HttpClientResponse.

Reproduction of a bug: https://github.com/meshuga/vertx-rx-bug.

As for now, I just use the code below instead of io.vertx.rxjava.core.http.HttpClientRequest#toObservable():

AsyncSubject<HttpClientResponse> subject = AsyncSubject.create();
HttpClientRequest req = makeRequest(...);

req.handler(resp -> {
  subject.onNext(resp);
  subject.onCompleted();
});
req.exceptionHandler(subject::onError);

I don't know why ReadStreamAdapter had been implemented, however I would recommend changing it to plain Rx subjects, that is AsyncSubject and PublishSubject (less pain).

Cannot delay HTTP response

I've hit a subtle issue that doesn't allow me to delay a response to the client.
In the real code (this is a simplification) I'm using the RxJava extensions to wait for a particular condition before sending a response to the client, but the problem happens also with the following code.

        // RxJava - vertx
        HttpServer server = vertx.createHttpServer();

        server.requestStream().toObservable()
                //.delay(1, TimeUnit.MILLISECONDS)
                .subscribe(req -> {
                    req.handler(data -> logger.info("RECEIVED " + data)); // Important
                    req.response().setStatusCode(200).setChunked(true).write("Hello World").end();
                });

        server.listen(9000);

The code above works perfectly if you comment the delay. When sending GET or POST requests to port 9000 the body is printed to the console (in the case of POST requests, I'm sending them using Postman) and the message "Hello World" is returned to the client.

If you just put a delay into the observable (any amount of time causes the problem), the issue appears:

Exception in thread "RxComputationScheduler-1" java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:57)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorNotImplementedException: Request has already been read
	at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
	at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
	at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
	at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
	at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
	at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:205)
	at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:139)
	at rx.internal.operators.OperatorDelay$1$3.call(OperatorDelay.java:87)
	at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
	... 7 more
Caused by: java.lang.IllegalStateException: Request has already been read
	at io.vertx.core.http.impl.HttpServerRequestImpl.checkEnded(HttpServerRequestImpl.java:426)
	at io.vertx.core.http.impl.HttpServerRequestImpl.handler(HttpServerRequestImpl.java:206)
	at io.vertx.rxjava.core.http.HttpServerRequest.handler(HttpServerRequest.java:74)
	at io.kubeless.server.VertxBug.lambda$start$1(VertxBug.java:41)
	at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
	at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
	... 10 more

The problem happens on the first GET request. If I send only POST requests, the problem happens on the second request, while the first works.

The offending line seems to be the one marked as "important" (i.e. reading the content of the request and logging it or doing anything else with it).

I think that this should be solved, but I'm looking also for a workaround (eg. is there a way to buffer the request content to be able to read it multiple times without errors?).

Missing call to this.fireComplete() in ObservableHandler

In the ObservableHandler<T> class you have

...
 public ObservableHandler() {
    this(new HandlerAdapter<T>() {
      @Override
      protected void dispatch(T event) {
        this.fireNext(event);
        //this.fireComplete() should be called here
      }
    });
  }
...

I think the dispatch(T event) implementation should call also this.fireComplete() as per the commented line above, similarly to the ObservableFuture class. Otherwise the Observer's state is not updated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.