Coder Social home page Coder Social logo

microprofile-reactive-streams-operators's Introduction

microprofile reactive

MicroProfile Reactive Streams Operators

Rationale

Reactive Streams is an integration SPI - it allows two different libraries that provide asynchronous streaming to be able to stream data to and from each other.

Reactive Streams is not however designed to be used directly by application developers. The semantics defined by Reactive Streams are very strict, and are non trivial, particularly in the areas of thread safety, to implement correctly. Typically, application developers are expected to use third party libraries that provide the tools necessary to manipulate and control streams. Examples include Akka Streams, RxJava and Reactor.

Depending on third party libraries for this essential application developer facing functionality however is not something that MicroProfile can do. MicroProfile specifications are not allowed to depend on anything other than the JDK and other MicroProfile specifications.

Hence, for MicroProfile to provide application developer APIs that use Reactive Streams, MicroProfile must provide its own Reactive Streams manipulation and control library. In future, it is hoped that this library will be adopted by the JDK itself, after a period of suitable incubation in the MicroProfile project. Some JDK maintainers have indicated that this would be a suitable path to get this type of functionality into the JDK.

Influences and History

The naming and scope of this API is inspired by the JDK8 java.util.stream API. The JDK8 stream API however does not capture all typical functionality that an application developer using Reactive Streams would need. For additional API naming and scope, Akka Streams, RxJava and Reactor have been used as an inspiration.

Implementations

MicroProfile Reactive Streams does not contain an implementation itself but only provides the specified API, a TCK and documentation.

The following Implementations are available:

Design

MicroProfile Reactive Streams offers a series of builders for creating instances of Reactive Streams Publisher, Subscriber and Processor. Subscriber’s are associated with a CompletionStage of a result of consuming the stream, this may be the result of a reduction on the elements, or in some cases just indicates the termination of the stream either as a success or with an error.

The API builds a graph from a series of graph stages, which are provided as an SPI. A Reactive Streams engine, which is implemented by implementations of the specification, and can also manually be provided by end users, is responsible for building the graph into a running stream.

Reactive Streams is available in JDK9 in the java.util.concurrent.Flow API, however, MicroProfile is not ready to require a baseline JDK version above 8. For this reason, the same interfaces provided by https://www.reactive-streams.org in the org.reactivestreams package are used instead. This does place a dependency from MicroProfile to a third party library, however, that third party library is nothing more than the four Reactive Streams interfaces (Publisher, Subscriber, Subscription and Processor), and these have been copied as is into JDK9. As an interim solution while JDK9 adoption catches up, this has been deemed an acceptable exception to the rule. The approach documented in MicroProfile Approach to Reactive for ensuring a smooth transition to the JDK9 APIs has been adopted.

Building

The whole MicroProfile Reactive Streams project can be built via Apache Maven.

$> mvn clean install

Diagrams

The API documentation makes extensive use of marble diagrams to visualize what each operator does. These diagrams are generated using a DSL written specifically for this purpose, rather creating them in a visual diagram editor which would make it very difficult to ensure consistency, especially between different contributors.

The DSL is implemented in JavaScript, and the diagrams are created using SVG. This combination of technologies allows rapid development of the diagrams, since they can be run, inspected, debugged and tweaked directly in a web browser. They are then exported to PNG files using Puppeteer, a high level JavaScript API for controlling headless Chrome, running on node.

Editing diagrams

All the diagrams are declared in mp-rs-ops-marbles.js. This contains a map of all the graphs. Each graph has an array of stages, and stages have zero to many elements. The stages are as follows:

  • ins - An input stream. Called ins because in is a JavaScript keyword, and ins makes the width 3 characters for nice alignment with other stages. The first parameter may optionally be a map of options, the only option currently read is label, which is used to label the stage.

  • sub - A sub stream. This will automatically have a label generated for it, stream[n], where n is the number of the sub stage, starting from one.

  • out - An output stream, the difference between out and ins is that out marbles inherit their colour from the previous ins marble.

  • op - The operator stage. The first argument must be the operator, this may be followed by zero to many intermediate value marbles.

  • eff - An effect stream. Used to visualize side effects (eg, callbacks executed by forEach).

  • res - A result stage. This displays the result that gets redeemed by a CompletionSubscriber when it completes.

Each of the stages support zero to many marbles, which are declared as follows:

  • n - an ‘onNext’ marble. A regular element passed through the stream. The colour is automatically computed - each marble in an ins stream gets a new colour, while all other marbles have their colour selected based on the previous ins marble, searching up vertically first, then left. May optionally take a options argument, supporting a link argument which indicates that this marble should be linked to (eg as a feedback loop from the output back to the input) another marble.

  • term - an onComplete marble, indicating successful termination of the stream.

  • nterm - a hybrid between n and term, allowing a marble to also carry a termination signal. This is used to make it clear that when a certain marble is emitted, that is the last marble and the stream will be terminated immediately.

  • err - an onError marble, indicating failed termination of the stream.

  • e - a side effect. Should usually contain an example callback invocation.

  • i - an intermediate value. This is used for operators that compute intermediate values that are fed back into the operator callbacks with the next element.

  • r - a result value. This is used with the res stage, to indicate the value that gets redeemed by the result CompletionSubscriber.

  • none - No marble. Inserts a blank space between marbles. Needed to ensure alignment of cause and effected marbles.

After editing or creating a new diagram, you can test your changes by opening them in a browser. Before you do this on the very first time, you need to run npm install in the api module to ensure the JavaScript dependencies are installed. This will be done automatically if you run mvn process-resources (or any lifecycle phase after process-resources, such as compile or install) - the build uses the Maven frontend plugin to install Node, install npm, then run npm to install the dependencies. Included in the dependencies is an installation of Chromium which is used to generate the PNG diagrams for inclusion in the javadocs, this may take a while to download.

Once the dependencies are installed, you can then open api/src/docs/js/index.html, this will show you all the rendered diagrams. No generation step is required to view these diagrams, you can simply hit refresh in the browser after making any changes.

Generating diagrams

We convert the diagrams to SVG, then to PNG, by using Puppeteer, a high level API on top of Chrome running in headless mode. The SVG diagrams are generated in Chrome, and then screenshotted to create the PNGs. This is automatically done by Puppeteer. However, we can’t run this as a part of the regular build because the MicroProfile CI and release server does not have the necessary dependencies to run Chrome. We’ve investigated a variety of different alternatives, including using different strategies for generating the diagrams, but nothing viable has come up, and unfortunately installing shared libraries in the Eclipse CBI is too high a maintenance burden for the Eclispe CBI maintainers, so they’ve refused to do it. Consequently, we need to check the diagrams into git, which means whenever they are changed, they need to be manually regenerated.

To generate the diagrams, run:

mvn -Pmarble-diagrams clean package

The diagrams will be saved to api/src/main/java/org/eclipse/microprofile/reactive/streams/doc-files, from there they can be included in the javadocs using an image tag, eg:

<img src="doc-files/map.png" alt="map marble diagram">

Make sure to include the alt text, the CI build will fail if it’s not there.

You can then view the diagrams in the api docs by opening api/target/apidocs/index.html, and navigating to the class that you added the marble diagram to.

Before committing your changes, make sure to use the above command to generate the diagrams, and then check the results of it into git, including the updated marble-diagram-hashes.json file. As part of the verification of the build, we have a task that checks that all the hashes of all the input and output files from the diagram generation process match the hashes when the diagrams were last generated. Failure to do this will result in the build failing in CI, and so it won’t pass PR validation.

Contributing

Do you want to contribute to this project? Find out how you can help here.

microprofile-reactive-streams-operators's People

Contributors

anthonyvdotbe avatar azquelt avatar cescoffier avatar danielkec avatar dependabot[bot] avatar eclipse-microprofile-bot avatar eclipsewebmaster avatar emily-jiang avatar gitter-badger avatar hutchig avatar jroper avatar kabir avatar kenfinnigan avatar matzew avatar otaviojava avatar phillip-kruger avatar stuartwdouglas avatar tevans78 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

microprofile-reactive-streams-operators's Issues

Rename @Incoming and @Outgoing `topic`

Right now the @Incoming and @Outgoing annotations have a topic attribute. The word topic does not always make sense depending on the transport layer.

We can change it to:

  • value - so we can use @Incoming("my-source")
  • from and to:
@Incoming(from="my-source")
@Outgoing(to="my-destination")
public void PublisherBuilder<String> doSomething(Publisher<Integer> source) {
    // ...
}

QUESTION: Message with payload but no meta-data. . .

One of the value of the Message is to create a structure that can encapsulate data and the supplemental Information about such data. Some call it headers, some properties, some meta-data etc.. . . basically some key/value structure (Map).
Looking at the current Message I only see a payload. Is there a reason for omitting it?

public interface Message<T> {

  /**
   * The payload for this message.
   */
  T getPayload();
  
  /**
   * Supplemental Information about this message
   */
  Map<String, Object> getHeaders();
}

Wrapping RS Elements using lazyBuilders - You can't use Subscribers/Processors more than once

In issue #105 in this comment @jroper discussed the problem: As subscribers
are single use - what do we do if we have a Graph that imports a Subscriber (or Processor) and then we try to run/materialize it more than once? For example, if that Subscriber object got a onComplete() call - which 'run' would it be for?

The obvious answer is to: instead of importing a Subscriber instance we import a means of getting
a Subscriber (and similarly for Processor/Publisher in order to keep the API consistent). Java has a pattern for this, a Supplier. Of course we have a means to produce a Subscriber in the API already the SubscriberBuilder. So we probably want one of these that will attempt to supply a 'fresh' Subscriber each time it is run/materialized, perhaps a `lazy' SubscriberBuilder or as James termed it

SubscriberBuilder ReactiveStreams.lazySubscriberBuilder(Supplier<SubscriberBuilder>)

Stream splitting

We might want to introduce some stream splitting functions. For example, a splitWhen function.

This functionality is generally only useful when doing parsing, where you parse some sort of header out of the stream, followed by a sub stream, then another header, etc.

Generally, the element that is split at needs to be accessed before consuming the sub stream. So the sub stream should make that element available, along with the rest of the stream. For example:

PublisherBuilder<SubStream<T>> splitWhen(Predicate<? super T> predicate);
class SubStream<T> {
  final T first;
  final PublisherBuilder<T> rest;
}

So, for each element where the predicate holds true (including the first element of the stream regardless of whether the predicate holds true or not), a SubStream is emitted, with first being that element, and rest being the remainder of the stream as long as the predicate holds false.

Peek stage

We could add a peek method, which performs a side effect on each element in the stream without modifying the stream in any way. This is equivalent to the JDK8 Stream.peek.

Spec clarification of @Inject @Producer - result same as ServiceLoad?

The spec has support for service loading an engine and for @injecting an engine in the context
of a container.

Should these always return the same result?
This has the practical implication that the @Inject's @Producer method needs to use (a mechanism similar to) ServiceLoader.load to get the engine.
Thus enabling a users provided Engine, higher in the classpath, to be injected.
Or should the @Inject always result in the 'default' container - even if the user has provided one that is the 'first' in the list found by the ServiceLoader?

The use case of of an users impl decorating/delegating to the default impl can be me
by the fact that the ServiceLoader mechanism will return a 'set' of matches that can
be iterated.

I would probably vote for @Inject and ServiceLoad returning consistent results -
the interesting thing here is that the spec says what is injected is 'ApplicationScoped'
whereas one would expect what comes back from ServiceLoad to be dependant.
Hmm?

The spec has:

"If a MicroProfile container provides an implementation of MicroProfile Reactive Streams
Operators, then it must make an application scoped ReactiveStreamsEngine available for injection."

and

"To run this TCK, the container must provide a ReactiveStreamsEngine to be tested as an injectable
ApplicationScoped bean, and the MicroProfile Reactive Streams Operators API must be on the
classpath."

and
in the javadoc...
https://github.com/eclipse/microprofile-reactive-streams/blob/9c88b3fdb6e65c63ca3c40f5609d965c7b3e92f8/api/src/main/java/org/eclipse/microprofile/reactive/streams/spi/ReactiveStreamsEngine.java#L32
The zero argument {@code build} and {@code run} methods on subclasses of this will use

  • the {@link java.util.ServiceLoader} to load an engine for the current context classloader.

Clarify the behaviour of imported/exported org reactive stream elements

There are a number of places in the specification where org reactive stream (and presumably in the future - j.u.c.Flow) elements are imported or exported. User can't really tell how these objects should (for import) or will (for export) behave in some important aspects of their behaviour.

For exported objects we do not specify any aspects of the behaviour of the implementations exported so all the user has to work with is the ORS specification.

For example, for a Publisher that is the result of

  myPublisher = ReactiveStreams.of( 'a', 'b', 'c' ).buildRs();

How would this Publisher react to a second 'subscribe( subscriber )'?

Without additional specification here, a reasonable interpretation of the underlying
ORS spec might suggest the equivalent of WITH_FANOUT (meaning taken from https://doc.akka.io/japi/akka/current/akka/stream/javadsl/AsPublisher.html) as the default so it could be expected to 'work' correctly for both subscribers.

Furthermore with two subscribers, without further specification, one would expect a backpressure policy of calling onNext() to Subscribers where the request(n) count is >0 for their subscription and
sending sufficient request(n)'s upstream to enable this, buffering for those Subscribers
where request(n) count is 0. So what of the buffer size to handle the variance in total request(n)'s received? It could either be implementation specific or added to the buildRs() signature like RxJava operators do.

Similarly, for imported objects we do not specify any requirements on the implementation of the
ORS/Flow interface methods. Or on the state of the objects, for example what if one
'imports' a Publisher that already has a Subscriber subscribed?

The other 'exit doors' (to external types not defined in this spec) are analogous
but they appear to be covered adequately (I will append a list below and check each).

For example - the specified behaviour of what comes out
of the CompletionSubscriber::getCompletion() method when a CompletionSubscriber
is created from a Subscriber that is 'imported' from a user is covered adequately
by the javadoc of the CompletionSubscriber::getCompletion() method which states:
"This should be redeemed by the subscriber either when it cancels, or when it receives an
* {@link Subscriber#onComplete} signal or an {@link Subscriber#onError(Throwable)} signal"

Consider dropping overloaded build method to receive custom engine

From @venkats:

In https://github.com/eclipse/microprofile-reactive/blob/master/streams/spec/src/main/asciidoc/architecture.asciidoc, under the Rationale section, in the paragraph that starts with "There are a number of different approaches..." the text reads: "...however users can select to use a custom implementation if they need." While this makes sense for the JDK implementation, does this make sense for MicroProfile implementation where 3-rd party dependency is not desired?

Stream context in messaging spec

We should define a CDI stream context that MicroProfile Reactive Streams implementations are required to provide, and to ensure is active during the execution of all callbacks passed to Reactive Streams builder methods, eg, to the map function executed by the map operator.

This could perhaps be called @StreamScoped. It would be worth investigating context inheritance (something that I've seen mentioned but I've never actually seen any mechanism for actually implementing it so I have no idea what it means), since it would be good if a stream context instance could inherit from the context that a given stream is attached to.

Messaging config

We need to define an approach for configuring messaging.

I'm not sure if there are any generic configuration parameters that we want to support at this stage (though perhaps being able to supply the topic name in configuration instead of in the annotation would be good), but the important thing is that we have a consistent approach for configuring things.

In some prior discussions, we've discussed adding a configuration key attribute to the annotations. The actual name of it is certainly up for date, configKey, configId, streamId, id, key, config etc are all examples.

The idea would be that if you said:

@Incoming(topic = "mystream", configKey = "com.example.my-stream")
public CompletionStage<?> handleEvent(MyStreamEvent event) {
    ...
}

Then configuration for this stream would first be read from the com.example.my-stream namespace, before reading it from a general location. So for example, let's say we had a configuration parameter called parallelism, in the above case, we'd first read com.example.my-stream.parallelism, if not set there, then we might read org.eclipse.microprofile.reactive.messaging.parallelism, before finally defaulting to something else. We could also say that you can override the topic defined on the annotation by setting com.example.my-stream.topic in the configuration.

Ability to create `ProcessorBuilder` from a `SubscriberBuilder` and a `PublisherBuilder`

It would be good to have the ability to create a ProcessorBuilder from a SubscriberBuilder and a PublisherBuilder.

Akka Streams provides two different methods for doing this, the first treats them as completely different streams, which is more a convenience than anything else. The second is to wrap them in a way that is "coupled", what this means is the lifecycle events - cancel, onComplete and onError are transferred between the Subscriber and the Publisher, so for example, if the Subscriber receives an onComplete, then the resulting Processor emits onComplete, and cancels the Publisher. Likewise, if the Publisher receives a cancel, then the resulting Processor also emits cancel upstream, and sends onError(CancellationException) to the Subscriber.

I think this will be useful for a number of messaging scenarios, for example, if integrating with a library that provides a Publisher and Subscriber.

Fold processing stage

I think a folding processing stage would be good. There are a number of use cases that I've come up with, particularly in IoT scenarios.

Let's say you have an IoT device that every few seconds sends the status of a door, ie, whether the door is opened or closed. You want to convert this status to an action, ie, a message should only be sent when the door changes from opened to closed, or vice versa, so that you can then do further processing down the stream without overwhelming your system with these status messages every few seconds. This is what it might look like:

public enum DoorStatus {
  Open, Closed;
  public DoorAction toAction() {
    switch (this) {
      case Open: return DoorAction.Opened;
      case Closed: return DoorAction.Closed;
    }
  }
}
public enum DoorAction {
  Opened, Closed;
}

@Incoming(provider = WebSocket.class, "door-status")
@Outgoing(provider = Kafka.class, "door-action")
public ProcessorBuilder<DoorStatus, DoorAction> convertDoorStatusToAction() {
  return ReactiveStreams.<DoorStatus>builder()
    // Fold to keep the last two statuses in an array,
    // we use an array because Java doesn't have tuples
    .fold(new DoorStatus[0], (prevTwo, next) -> {
      if (prevTwo.length == 0) return { next };
      else return { next, prevTwo[0] };
    })
    // Now convert each pair of consecutive statuses to an optional action
    .flatMapIterable(pair -> {
      // If it's the first one (due to new connection), consider the first one an action
      if (pair.length == 1) return List.of(pair[0].toAction());
      else if (pair[0] == pair[1]) return List.of();
      else return List.of(pair[0].toAction());
    });
}

Another use case might be that you have a temperature sensor, and you want to output a running average. This can be done in a very similar way to above.

One of the things that makes this not so nice is Java doesn't have a concept of tuples - typically when you do a fold, you want to fold into a tuple of your state, and an event that you want to output, and then a subsequent map will just emit the event and drop the state. But without tuple support in Java, you have to either do like the above, hack it using arrays, or create a new class just to hold your tuple. Of course, there are libraries out there that provide tuples, eg Akka provides Pair, though it's still cumbersome without any sort of syntactic sugaring around it provided by the language.

Can @Outgoing be @Named?

Methods annotated with @Ougtoing method can be also annotated with @Named so other method using @Incoming should be able to retrieve them.

Example:

@Incoming(topic = "my-stream")
@Named("toUpperCase")
public Publisher<String> toUppercase(Flowable<String> input) {
    return input.map(String::toUpperCase);
}

// ....
@Incoming(topic = "toUpperCase")
@Outgoing(topic = "my-output")
public PublisherBuilder<String> duplicate(PublisherBuilder<String> input) {
    return input.flatMap(s -> ReactiveStreams.of(s, s));
}

Should `flatMap` also support receiving a `Publisher`

In the current version, the flatMap operator accept a method returning the PublisherBuilder. What about also being able to return a Publisher directly.

While I agree we can always use ReactiveStreams.fromPublisher(actualPublisher), it could be simplified.

Obviously, because of Java limitation on generics, this new operator should be named differently, flatMapPublisher maybe.

Add a scope section into the stream specification

This section define the scope of the spec:

  • No CDI integration - but implementations can provide one
  • No container integration - but implementations can provide one
  • The spec is purely about providing operators to manipulate Reactive Streams. It proposes an API closed to java.util.Streams.

Exception handling in streams

All of the streams callbacks are using java.util.function.*, which don't allow throwing checked exceptions. Should we allow streams to be completed with checked exceptions, by providing a wrapper unchecked exception that gets unwrapped when caught? java.util.concurrent.CompletionException could be a candidate to use.

One argument for not doing this is that CompletionStage doesn't do it, so it may worth being consistent in behavior with CompletionStage. On the other hand, it's much nicer for users to not have to unwrap exceptions.

Improve loading of ReactiveStreamsFactory and ReactiveStreamsEngine to be in line with the solution in other specs

Currently, ReactiveStreamsFactory is loaded using service loader in a private factory() method. Other specs do something similar to provide an implementation (Config in ConfigProviderResolver, REST client, etc.)

Other specs that use this pattern also do the following, which the current solution in ReactiveStreamsFactory.factory() does not:

  • load the implementation using the thread contect classloader instead of the default classloader
  • provide means to set an implementation directly and bypass service loader, this is to support environments where service loader isn't supported, mainly OSGi

An example of a solution, which meets the above 2 points, is the ConfigProviderResolver.instance() in the Config spec.

The tck has an unsatisifed dependnecy on reactive-streams-examples

ReactiveStreamsArquillianTck places the package org.reactivestreams.tck and it's subpackages into the test war. That will create a dependency on reactive-streams-examples.

For example org.reactivestreams.tck.flow.support.HelperPublisher imports org.reactivestreams.example.unicast.AsyncIterablePublisher

Is MessagingProvider that important?

The @Incoming and @Outgoing annotations have a provider attribute. This provider indicates the transport technology used by the stream. I'm not sure this attribute is relevant as the user code should not care much about it. Typically, if we have something like:

@Incoming(topic="hello")
...

The implementation can check if there is a stream (Publisher) named hello defined in the configuration or somewhere else (@Produces annotation - this will be covered by another issue). In the first case, it's the configuration that describes the transport layer (and the rest of the configuration such as the broker url, credentials...).

Setup CI

We need to setup CI on this repo. I'm not sure what's required for that.

Align API with Java Streams - few missing methods

A few methods (or variants) are missing to map with the Java Stream API. Obviously, not all of them make sense, but some may have benefits. Here is the list of methods from Java Streams that we don't provide currently. I've already eliminated xStream APIs.

boolean	allMatch​(Predicate<? super T> predicate)	
Returns whether all elements of this stream match the provided predicate.

boolean	anyMatch​(Predicate<? super T> predicate)	
Returns whether any elements of this stream match the provided predicate.

<R> R	collect​(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner)	
Performs a mutable reduction operation on the elements of this stream.

long	count()	
Returns the count of elements in this stream.

Stream<T>	distinct()	
Returns a stream consisting of the distinct elements (according to Object.equals(Object)) of this stream.

Optional<T>	findAny()	
Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty.

static <T> Stream<T>	iterate​(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next)	
Returns a sequential ordered Stream produced by iterative application of the given next function to an initial element, conditioned on satisfying the given hasNext predicate.

boolean	noneMatch​(Predicate<? super T> predicate)	
Returns whether no elements of this stream match the provided predicate.

Some notes:

  • collect - this is just a different variant
  • findAny - not sure of the value as we have findFirst
  • iterate - this is just a different variant

Write more details on reactive architectures in approach to reactive doc

The approach to reactive doc currently only has a single paragraph on messaging in the reactive architecture section. A lot more detail is needed. We should have some more detail on guiding principles, eg, talking about eventual consistency, achieving that without transactions, at least once guarantees using offsets. We should also talk more about specific specs in detail, messaging is one, but also event logs, event sourcing, CQRS etc.

Error handling strategy

I've been implementing a lot more tests for the streams TCK, and something has come up.

Should errors be allowed to overtake elements? This turns out to be a very philosophical question, here's some discussion about it from 4 years ago when Akka Streams was being implemented:

akka/akka#15346

See the linked discussion on a commit for even more details.

So, Akka's stance here is that if the stream has an error, it should be propagated ASAP. That means, if there are any buffered elements in front of it, the error should overtake them. I think the argument for it is generally that, since onError can be invoked at any time, regardless of whether there is demand or not, it should not be held up by elements in front of it. That said, onComplete can also be invoked at any time regardless of whether there is demand or not, but the same rule doesn't apply to it.

I don't know what RxJava's strategy is.

Here's on example of where this will make a difference:

ReactiveStreams.concat(
  ReactiveStreams.of(1, 2),
  ReactiveStreams.failed(new RuntimeException("failed"))
).recover(t -> 3).toList();

If failing fast, then it is technically non deterministic what the resulting list will be - though in practice, it will be just List(3), however, failures are not allowed to overtake elements, then the result will be List(1, 2, 3).

Another possibility for overtaking in the current API exists in flatMap* methods, if an error comes from upstream while the current Publisher/CompletionStage/Iterable is still emitting elements, then that error will overtake the elements.

There are a few ways that we can deal with this. One thing about errors overtaking elements is that generally it is non deterministic. So you technically can't require that an implementation always allows errors to overtake elements, you can only say that it's allowed for elements to overtake elements, which effectively gives it the option of allowing it or not. Even if you try to contrive a TCK test to ensure it happens in certain situations, you risk the test relying too much on unspecified implementation details. So I think there are two options:

  1. Allow errors to overtake elements
  2. Require that errors must never overtake elements

Certainly when implementing number 2 on top of Akka Streams, that's going to be difficult, but I've found even implementing number 2 on the zero dependency implementation makes a lot of very simple stages more complicated than they necessarily need to be.

Context detached streams

Currently the messaging spec specifies streams that are attached to the lifecycle of the context in which they run - so they are automatically started when the context is started, and automatically shut down when the context is destroyed, and kept running (restarted in case of failure) in the meantime. These streams are provided by declaring an annotated method.

There are also use cases that would require streams detached from a particular context, such as the ability to publish a single event in response to a HTTP request, as well as the ability plumb a WebSocket stream into a message broker. These streams would be provided by declaring @Inject annotated stream types (eg, PublisherBuilder and SubscriberBuilder), and maybe also messaging specific equivalents that offer more functionality, and the container would inject these into beans so that they can be started/stopped by the developer on demand.

Split streams and messaging repositories

The MicroProfile release process won't work, without major changes, with two specs in the same repository. Also, it's probably better not to have two specs in the same repository. So, as @OndrejM suggested in the Reactive Hangout yesterday, we need to split the repositories.

One good thing about them being together was that the messaging spec could depend on the streams spec without it being released. But since a milestone release of the streams spec is imminent, that soon won't be a problem.

So, I've created the following request the for repo to be split:

https://bugs.eclipse.org/bugs/show_bug.cgi?id=537777

What will happen is this repo will be renamed to microprofile-reactive-streams, and a new microprofile-reactive-messaging repo will be created. Since there has been more work, iteration, discussion about the streams spec than the messaging spec, and since the streams spec is close to the release and the messaging spec is still a fair way out, I think keeping the history with streams is the best option.

Hopefully the new repo should be created soon, but in the meantime, I'm going to do the following in preparation for a release:

  • Create a new streams-only branch.
  • Remove the messaging project from the streams-only branch.
  • Cut the milestone release from the streams-only branch.

Once the new messaging repo is created, we can merge the streams-only branch into master.

Async collectors

Currently we're using the JDK8 collector API for doing all terminal accumulations. It would be good to have an async alternative, eg AsyncCollector, where the accumulator and finisher functions return CompletionStage of a value when they are done. Then async alternatives of collect, reduce etc can be provided.

Discussion for *Builder naming

The current names for the primary classes being used are PublisherBuilder, SubscriberBuilder, ProcessorBuilder and CompletionBuilder. These accurately reflect what they are doing, they are used to build publishers, subscribers, processors and complete graphs. When working with them fluently the names don't even come into play:

Publisher<String> publisher = ReactiveStreams.fromPublisher(somePublisher)
  .map(Object::toString)
  .build();

However, one of the use cases for this library is for APIs to use the interfaces directly, eg, here's how a developer might use the messaging spec:

@Incoming(topic = "mytopic")
public SubscriberBuilder<MyMessage, Void> subscribeToMyMessage() {
  return ReactiveStreams.<MyMessage>builder()
    .forEach(message -> System.out.println("I got a message: " + message));
}

In this case, the Builder name features very prominently in a place that might not be so intuitive.

This issue is to discuss this naming and whether the above is acceptable, or if we should try to come up with different names. The names Publisher, Subscriber and Processor are out due to the confusion it might introduce with Reactive Streams itself, and the fact that imports for the Reactive Streams libraries are already on the classpath.

One option is to go for a completely different set of names, as some other Reactive Streams libraries do. For example, RxJava uses Observable for Publisher and doesn't really have an equivalent for Processor and Subscriber. Reactor uses Flux and Mono for Publisher, and also doesn't have an equivalent for Processor and Subscriber. Akka Streams uses Source, Flow and Sink for Publisher, Processor and Subscriber respectively, though Flow is problematic due to a name class with the JDK9 java.concurrent.util.Flow outer class that holds all the reactive streams interfaces.

Flatten Stage

We discussed the interface Stage, which is huge and contains so many nested interfaces. I think it makes sense to flatten the hierarchy.

.build() vs .run()

Currently, PublisherBuilder, SubscriberBuilder and ProcessorBuilder have a .build() method, while CompletionBuilder has a .run() method. Should we change CompletionBuilder to use .build()? Consistency would be good, and also it makes sense because it's a builder so it should have a build method, but, at the same time, unlike the other builders, when you build a CompletionBuilder, it actually runs the stream. Another possibility could be to change the name of CompletionBuilder, maybe to RunnableStream or something like that, then it can have a run() method, since you can't actually do any more operations or builder methods on a CompletionBuilder anyway.

PeekStage tests not added to the task suite

The org.eclipse.microprofile.reactive.streams.tck.PeekStageVerification has not been added to list of tests (org.eclipse.microprofile.reactive.streams.tck.ReactiveStreamsTck#allTests)

What additional methods should we have?

From @venkats:

I am quite eager to see implementations of highly useful methods like zip, scan, etc. These are very lacking in the (Java 8) Stream API and I think it is critical to have these methods.

Related to #9. As discussed in #9, for the first version of this API we want to primarily focus on functionality that's useful to messaging, and specifically the messaging spec, other operators can be added in future versions of the API - for those future versions of the API we can define new applications of the API and therefore come up with new operators that will be useful in those applications.

But for now, there most likely are operators that will be useful users of the messaging spec, so the question is, what are they, and what use cases will they fulfil?

Rename messaging spec to events spec

Should we rename MicroProfile Reactive Messaging to MicroProfile Reactive Events?

To be clear, this wouldn't mean that the spec doesn't deal with messages that aren't events, it can certainly still deal with that. But what it does mean is that the focus of the spec is events, with messages being a secondary concern, and it also means that it can handle events that aren't messages, such as publishing events from an event log.

One reason for doing this is to piggy back off momentum and marketing around CloudEvents, and aligning with a general industry progression towards thinking about events as an important top level concept in cloud native systems.

PublisherHelper, SubscriptionHelper etc.

I was interested to read:
https://medium.com/@olehdokuka/mastering-own-reactive-streams-implementation-part-1-publisher-e8eaf928a78c
about the demands of building your own correct Publisher state machine, when the domain specific code is really what you are interested in and you just want to concentrate on emitting events.

I wondered if there is something we should do to help along the lines of
https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/internal/subscriptions/SubscriptionHelper.java
https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/internal/util/BackpressureHelper.java

Perhaps with a class to help with a standard abstract state machine that perfoms operations in the
'correct' sequence with a plugin type/functional callbacks for the domain specific code or along the line of the RxHelper - or maybe some 'default' methods in the interfaces to flag up at least
some of the pitfalls to a naive implementation.

Add support for @Produces?

In the Reactive Messaging spec, it could be interesting to add the support for @Produces annotation:

@Produces
@Named("my-stream")
public PublisherBuilder<String> produceAStreamOfString() {
   // ....
}

Can be supported:

  • Publisher (or sub-classes), PublisherBuilder
  • Subscriber (or sub-classes), SubscriberBuilder

The @Named annotation is used to indicate the name that can be used in the @Incoming and @Outgoing annotations)

MicroProfile Reactive Streams title

MicroProfile Reactive Streams is too easy to get confused with Reactive Streams itself, we need a better title. Possibilities include:

  • MicroProfile Reactive Operators
  • MicroProfile Reactive Ops
  • MicroProfile Reactive Streams Ops

Remove operators from PublisherBuilder

From @venkats:

I feel that the methods in PublisherBuilder, like map and filter, are redundant since they're also in ProcessorBuilder. It may be better to make the PublisherBuilder a lot smaller and reuse the ProcessorBuilder more to build the pipeline?

Asynchronous boundaries

We may want to provide the ability to introduce asynchronous boundaries into streams, such that different parts of a stream can have different execution contexts, buffering strategies, etc. One possibility to provide this is to support the following method:

public ProcessorBuilder<T, R> async(ReactiveStreamsEngine engine);

This would cause all the stages prior to this stage to be executed using the passed in engine (which may have a custom configuration), and means that each engine is free to define what customization of execution attributes it supports. All subsequent stages would be run by whatever engine is passed to the eventual build method (or, the next async method). The underlying mechanism for doing the async boundary would be to use the passed in engine to build the Processor or Publisher, and then connecting the newly built Processor or Publisher via the subscribe method when it's run.

Stateful variants of functions

Currently, map, filter etc must be stateless, otherwise the returned builder can't be reused. We might consider adding stateful versions of them, ie, where the function is provided by a Supplier. This would then allow you to create things like a ProcessorBuilder<ByteBuffer, CharSequence> for decoding bytes into characters, that could be applied to many graphs, without having to create a new ProcessorBuilder each time.

We might also consider making the spi provide this, even if we don't offer it in the api, so that it can be added in future. The filter stage already allows stateful filtering, which is how drop and dropWhile are implemented.

We could also have a general stateful stage constructor in the API, which is a Supplier<Graph>, that gets supplied when the graph is built. This wouldn't need support in the spi.

Should the spec include an interface and bootstrapping for type 'ReactiveStreams'?

I would like to discuss:

There is a suggestion above to make the builders,
(say PublisherBuilder, ProcessorBuilder, SubscriberBuilder) have interfaces.
This is primarliy to enable easier consumption via javadoc and IDEs etc.

As well as different implementations, another thing that interfaces
enable is different implementations, dynamic-proxies and thus injection,
interceptors, parts of aop etc.

Is there a valid use case here that interfaces would enable?

Say one looks for a potential place to inject/proxy/intercept etc an
impl of

PublisherBuilder
ProcessorBuilder
SubscriberBuilder

One gets to where one initially gets hold of them.
So starting with PublisherBuilder, the places are:

1 - Its two constructors
2 - ReactiveStreams https://github.com/eclipse/microprofile-reactive-streams/blob/master/api/src/main/java/org/eclipse/microprofile/reactive/streams/ReactiveStreams.java

So...
1 - Obviously the constructors in 1 have a concrete type that is not replaceable.
2 - Is a concrete class with a fixed implementation, albeit I imagine a sensible and wise one, that
also returns concrete classes and so on and so on until the 'run' from a non-concrete
loadable/injectable ReactiveStreamsEngine.

At the moment we have an interface and bootstrapping method (service loading and injection mechanism) in the spec for ReactiveStreamsEngine.

I suggest we should explore having an interface and bootstrapping method for each 'way in' e.g. ReactiveStreams.java
to enable independent implementations or a good reason not to.

Marble diagrams

It would be great to have marble diagrams for each operator, similar to diagrams offered here:

http://rxmarbles.com/
http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html

Like RxJava, we could provide these diagrams directly in the javadocs, and possibly also extract them to some external documentation.

The diagrams should be created using an open source tool or language, so that the original source can be checked in to the repo and maintained by anyone. It may be worth investigating what rxmarbles used. Another possibility is to use one of the JavaScript SVG libraries, an abstraction for creating marble diagrams on top of that could be made, if done well, this would make it really easy to create new diagrams, as well as easy to maintain, code review, etc.

More examples specific to Reactive Streams

From @venkats:

The examples given at https://github.com/eclipse/microprofile-reactive/blob/master/streams/spec/src/main/asciidoc/examples.asciidoc are very similar to what Java 8 streams can do. It may be better to show examples that are different. For instance, how are errors handled, how do we perform tasks in a different thread? In other words, seeing enough difference from the use of Stream may benefit the reader as to how this implementation is (a) different from Stream, while parts of it is similar and (b) how they can truly work with asynchrony instead of parallel.

Naming inconsistencies inherrited from Java 8 stream

From @venkats:

I truly do not like how the (Java 8) Stream API has been inconsistent in naming. For example, limit, but takeWhile, and skip but dropWhile. I encourage the API used in the Builders to be more consistent. For example, it may be better to either use limit and limitWhile or take and takeWhile.

Alternative names for SubscriberWithResult

SubscriberWithResult is a product of a reactive stream Subscriber and a CompletionStage. It is used to hold both the thing that consumes a stream, as well as the result of consuming that stream, whether that result is a reduction of the elements of the stream, or in some cases the result has no value (is Void), but the CompletionStage indicates whether the stream completed normally or with an error.

I don't particularly like the name, it adequately describes what it is to be sure, but it's long. Here are some alternatives that I've thought of:

  • Accumulator - essentially, the Subscriber is responsible for accumulating a value into CompletionStage. We use this name in Play Framework for a somewhat similar abstraction. Some potential issues with this name - for cases where there's no result value (ie Void), the name isn't very intuitive. Also the term Accumulator is already used in the JDK in LongAccumulator and DoubleAccumulator.
  • SubscriberResult - a shortening of SubscriberWithResult

Any other ideas?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.