Coder Social home page Coder Social logo

reactive-streams-jvm's Introduction

Reactive Streams

The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.

The latest release is available on Maven Central as

<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams</artifactId>
  <version>1.0.4</version>
</dependency>
<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams-tck</artifactId>
  <version>1.0.4</version>
  <scope>test</scope>
</dependency>

Goals, Design and Scope

Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – think passing elements on to another thread or thread-pool — while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the backpressure signals were synchronous (see also the Reactive Manifesto), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.

It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.

It should be noted that the precise nature of stream manipulations (transformation, splitting, merging, etc.) is not covered by this specification. Reactive Streams are only concerned with mediating the stream of data between different API Components. In their development care has been taken to ensure that all basic ways of combining streams can be expressed.

In summary, Reactive Streams is a standard and specification for Stream-oriented libraries for the JVM that

  • process a potentially unbounded number of elements
  • in sequence,
  • asynchronously passing elements between components,
  • with mandatory non-blocking backpressure.

The Reactive Streams specification consists of the following parts:

The API specifies the types to implement Reactive Streams and achieve interoperability between different implementations.

The Technology Compatibility Kit (TCK) is a standard test suite for conformance testing of implementations.

Implementations are free to implement additional features not covered by the specification as long as they conform to the API requirements and pass the tests in the TCK.

API Components

The API consists of the following components that are required to be provided by Reactive Stream implementations:

  1. Publisher
  2. Subscriber
  3. Subscription
  4. Processor

A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).

In response to a call to Publisher.subscribe(Subscriber) the possible invocation sequences for methods on the Subscriber are given by the following protocol:

onSubscribe onNext* (onError | onComplete)?

This means that onSubscribe is always signalled, followed by a possibly unbounded number of onNext signals (as requested by Subscriber) followed by an onError signal if there is a failure, or an onComplete signal when no more elements are available—all as long as the Subscription is not cancelled.

NOTES

Glossary

Term Definition
Signal As a noun: one of the onSubscribe, onNext, onComplete, onError, request(n) or cancel methods. As a verb: calling/invoking a signal.
Demand As a noun, the aggregated number of elements requested by a Subscriber which is yet to be delivered (fulfilled) by the Publisher. As a verb, the act of request-ing more elements.
Synchronous(ly) Executes on the calling Thread.
Return normally Only ever returns a value of the declared type to the caller. The only legal way to signal failure to a Subscriber is via the onError method.
Responsivity Readiness/ability to respond. In this document used to indicate that the different components should not impair each others ability to respond.
Non-obstructing Quality describing a method which is as quick to execute as possible—on the calling thread. This means, for example, avoids heavy computations and other things that would stall the caller´s thread of execution.
Terminal state For a Publisher: When onComplete or onError has been signalled. For a Subscriber: When an onComplete or onError has been received.
NOP Execution that has no detectable effect to the calling thread, and can as such safely be called any number of times.
Serial(ly) In the context of a Signal, non-overlapping. In the context of the JVM, calls to methods on an object are serial if and only if there is a happens-before relationship between those calls (implying also that the calls do not overlap). When the calls are performed asynchronously, coordination to establish the happens-before relationship is to be implemented using techniques such as, but not limited to, atomics, monitors, or locks.
Thread-safe Can be safely invoked synchronously, or asychronously, without requiring external synchronization to ensure program correctness.

SPECIFICATION

1. Publisher (Code)

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
ID Rule
1 The total number of onNext´s signalled by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.
💡 The intent of this rule is to make it clear that Publishers cannot signal more elements than Subscribers have requested. There’s an implicit, but important, consequence to this rule: Since demand can only be fulfilled after it has been received, there’s a happens-before relationship between requesting elements and receiving elements.
2 A Publisher MAY signal fewer onNext than requested and terminate the Subscription by calling onComplete or onError.
💡 The intent of this rule is to make it clear that a Publisher cannot guarantee that it will be able to produce the number of elements requested; it simply might not be able to produce them all; it may be in a failed state; it may be empty or otherwise already completed.
3 onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
💡 The intent of this rule is to permit the signalling of signals (including from multiple threads) if and only if a happens-before relation between each of the signals is established.
4 If a Publisher fails it MUST signal an onError.
💡 The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers if it detects that it cannot proceed—Subscribers must be given a chance to clean up resources or otherwise deal with the Publisher´s failures.
5 If a Publisher terminates successfully (finite stream) it MUST signal an onComplete.
💡 The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers that it has reached a terminal state—Subscribers can then act on this information; clean up resources, etc.
6 If a Publisher signals either onError or onComplete on a Subscriber, that Subscriber’s Subscription MUST be considered cancelled.
💡 The intent of this rule is to make sure that a Subscription is treated the same no matter if it was cancelled, the Publisher signalled onError or onComplete.
7 Once a terminal state has been signaled (onError, onComplete) it is REQUIRED that no further signals occur.
💡 The intent of this rule is to make sure that onError and onComplete are the final states of an interaction between a Publisher and Subscriber pair.
8 If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.
💡 The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason for eventually is because signals can have propagation delay due to being asynchronous.
9 Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber and MUST return normally, except when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe).
💡 The intent of this rule is to make sure that onSubscribe is always signalled before any of the other signals, so that initialization logic can be executed by the Subscriber when the signal is received. Also onSubscribe MUST only be called at most once, [see 2.12]. If the supplied Subscriber is null, there is nowhere else to signal this but to the caller, which means a java.lang.NullPointerException must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a terminal state.
10 Publisher.subscribe MAY be called as many times as wanted but MUST be with a different Subscriber each time [see 2.12].
💡 The intent of this rule is to have callers of subscribe be aware that a generic Publisher and a generic Subscriber cannot be assumed to support being attached multiple times. Furthermore, it also mandates that the semantics of subscribe must be upheld no matter how many times it is called.
11 A Publisher MAY support multiple Subscribers and decides whether each Subscription is unicast or multicast.
💡 The intent of this rule is to give Publisher implementations the flexibility to decide how many, if any, Subscribers they will support, and how elements are going to be distributed.

2. Subscriber (Code)

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
ID Rule
1 A Subscriber MUST signal demand via Subscription.request(long n) to receive onNext signals.
💡 The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient "stop-and-wait" protocol.
2 If a Subscriber suspects that its processing of signals will negatively impact its Publisher´s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals.
💡 The intent of this rule is that a Subscriber should not obstruct the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from receiving CPU cycles.
3 Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher.
💡 The intent of this rule is to prevent cycles and race-conditions—between Publisher, Subscription and Subscriber—during the processing of completion signals.
4 Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST consider the Subscription cancelled after having received the signal.
💡 The intent of this rule is to make sure that Subscribers respect a Publisher’s terminal state signals. A Subscription is simply not valid anymore after an onComplete or onError signal has been received.
5 A Subscriber MUST call Subscription.cancel() on the given Subscription after an onSubscribe signal if it already has an active Subscription.
💡 The intent of this rule is to prevent that two, or more, separate Publishers from trying to interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled. Failure to conform to this rule may lead to violations of Publisher rule 1, amongst others. Such violations can lead to hard-to-diagnose bugs.
6 A Subscriber MUST call Subscription.cancel() if the Subscription is no longer needed.
💡 The intent of this rule is to establish that Subscribers cannot just throw Subscriptions away when they are no longer needed, they have to call cancel so that resources held by that Subscription can be safely, and timely, reclaimed. An example of this would be a Subscriber which is only interested in a specific element, which would then cancel its Subscription to signal its completion to the Publisher.
7 A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performed serially.
💡 The intent of this rule is to permit the calling of the request and cancel methods (including from multiple threads) if and only if a serial relation between each of the calls is established.
8 A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel() if there are still requested elements pending [see 3.12]. Subscription.cancel() does not guarantee to perform the underlying cleaning operations immediately.
💡 The intent of this rule is to highlight that there may be a delay between calling cancel and the Publisher observing that cancellation.
9 A Subscriber MUST be prepared to receive an onComplete signal with or without a preceding Subscription.request(long n) call.
💡 The intent of this rule is to establish that completion is unrelated to the demand flow—this allows for streams which complete early, and obviates the need to poll for completion.
10 A Subscriber MUST be prepared to receive an onError signal with or without a preceding Subscription.request(long n) call.
💡 The intent of this rule is to establish that Publisher failures may be completely unrelated to signalled demand. This means that Subscribers do not need to poll to find out if the Publisher will not be able to fulfill its requests.
11 A Subscriber MUST make sure that all calls on its signal methods happen-before the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic.
💡 The intent of this rule is to establish that it is the responsibility of the Subscriber implementation to make sure that asynchronous processing of its signals are thread safe. See JMM definition of Happens-Before in section 17.4.5.
12 Subscriber.onSubscribe MUST be called at most once for a given Subscriber (based on object equality).
💡 The intent of this rule is to establish that it MUST be assumed that the same Subscriber can only be subscribed at most once. Note that object equality is a.equals(b).
13 Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.
💡 The intent of this rule is to establish the semantics for the methods of Subscriber and what the Publisher is allowed to do in which case this rule is violated. «Raise this error condition in a fashion that is adequate for the runtime environment» could mean logging the error—or otherwise make someone or something aware of the situation—as the error cannot be signalled to the faulty Subscriber.

3. Subscription (Code)

public interface Subscription {
    public void request(long n);
    public void cancel();
}
ID Rule
1 Subscription.request and Subscription.cancel MUST only be called inside of its Subscriber context.
💡 The intent of this rule is to establish that a Subscription represents the unique relationship between a Subscriber and a Publisher [see 2.12]. The Subscriber is in control over when elements are requested and when more elements are no longer needed.
2 The Subscription MUST allow the Subscriber to call Subscription.request synchronously from within onNext or onSubscribe.
💡 The intent of this rule is to make it clear that implementations of request must be reentrant, to avoid stack overflows in the case of mutual recursion between request and onNext (and eventually onComplete / onError). This implies that Publishers can be synchronous, i.e. signalling onNext´s on the thread which calls request.
3 Subscription.request MUST place an upper bound on possible synchronous recursion between Publisher and Subscriber.
💡 The intent of this rule is to complement [see 3.2] by placing an upper limit on the mutual recursion between request and onNext (and eventually onComplete / onError). Implementations are RECOMMENDED to limit this mutual recursion to a depth of 1 (ONE)—for the sake of conserving stack space. An example for undesirable synchronous, open recursion would be Subscriber.onNext -> Subscription.request -> Subscriber.onNext -> …, as it otherwise will result in blowing the calling thread´s stack.
4 Subscription.request SHOULD respect the responsivity of its caller by returning in a timely manner.
💡 The intent of this rule is to establish that request is intended to be a non-obstructing method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution.
5 Subscription.cancel MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be thread-safe.
💡 The intent of this rule is to establish that cancel is intended to be a non-obstructing method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution. Furthermore, it is also important that it is possible to call it multiple times without any adverse effects.
6 After the Subscription is cancelled, additional Subscription.request(long n) MUST be NOPs.
💡 The intent of this rule is to establish a causal relationship between cancellation of a subscription and the subsequent non-operation of requesting more elements.
7 After the Subscription is cancelled, additional Subscription.cancel() MUST be NOPs.
💡 The intent of this rule is superseded by 3.5.
8 While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.
💡 The intent of this rule is to make sure that request-ing is an additive operation, as well as ensuring that a request for elements is delivered to the Publisher.
9 While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a java.lang.IllegalArgumentException if the argument is <= 0. The cause message SHOULD explain that non-positive request signals are illegal.
💡 The intent of this rule is to prevent faulty implementations to proceed operation without any exceptions being raised. Requesting a negative or 0 number of elements, since requests are additive, most likely to be the result of an erroneous calculation on the behalf of the Subscriber.
10 While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onNext on this (or other) subscriber(s).
💡 The intent of this rule is to establish that it is allowed to create synchronous Publishers, i.e. Publishers who execute their logic on the calling thread.
11 While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onComplete or onError on this (or other) subscriber(s).
💡 The intent of this rule is to establish that it is allowed to create synchronous Publishers, i.e. Publishers who execute their logic on the calling thread.
12 While the Subscription is not cancelled, Subscription.cancel() MUST request the Publisher to eventually stop signaling its Subscriber. The operation is NOT REQUIRED to affect the Subscription immediately.
💡 The intent of this rule is to establish that the desire to cancel a Subscription is eventually respected by the Publisher, acknowledging that it may take some time before the signal is received.
13 While the Subscription is not cancelled, Subscription.cancel() MUST request the Publisher to eventually drop any references to the corresponding subscriber.
💡 The intent of this rule is to make sure that Subscribers can be properly garbage-collected after their subscription no longer being valid. Re-subscribing with the same Subscriber object is discouraged [see 2.12], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely.
14 While the Subscription is not cancelled, calling Subscription.cancel MAY cause the Publisher, if stateful, to transition into the shut-down state if no other Subscription exists at this point [see 1.9].
💡 The intent of this rule is to allow for Publishers to signal onComplete or onError following onSubscribe for new Subscribers in response to a cancellation signal from an existing Subscriber.
15 Calling Subscription.cancel MUST return normally.
💡 The intent of this rule is to disallow implementations to throw exceptions in response to cancel being called.
16 Calling Subscription.request MUST return normally.
💡 The intent of this rule is to disallow implementations to throw exceptions in response to request being called.
17 A Subscription MUST support an unbounded number of calls to request and MUST support a demand up to 2^63-1 (java.lang.Long.MAX_VALUE). A demand equal or greater than 2^63-1 (java.lang.Long.MAX_VALUE) MAY be considered by the Publisher as “effectively unbounded”.
💡 The intent of this rule is to establish that the Subscriber can request an unbounded number of elements, in any increment above 0 [see 3.9], in any number of invocations of request. As it is not feasibly reachable with current or foreseen hardware within a reasonable amount of time (1 element per nanosecond would take 292 years) to fulfill a demand of 2^63-1, it is allowed for a Publisher to stop tracking demand beyond this point.

A Subscription is shared by exactly one Publisher and one Subscriber for the purpose of mediating the data exchange between this pair. This is the reason why the subscribe() method does not return the created Subscription, but instead returns void; the Subscription is only passed to the Subscriber via the onSubscribe callback.

4.Processor (Code)

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
ID Rule
1 A Processor represents a processing stage—which is both a Subscriber and a Publisher and MUST obey the contracts of both.
💡 The intent of this rule is to establish that Processors behave, and are bound by, both the Publisher and Subscriber specifications.
2 A Processor MAY choose to recover an onError signal. If it chooses to do so, it MUST consider the Subscription cancelled, otherwise it MUST propagate the onError signal to its Subscribers immediately.
💡 The intent of this rule is to inform that it’s possible for implementations to be more than simple transformations.

While not mandated, it can be a good idea to cancel a Processor´s upstream Subscription when/if its last Subscriber cancels their Subscription, to let the cancellation signal propagate upstream.

Asynchronous vs Synchronous Processing

The Reactive Streams API prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) MUST NOT block the Publisher. However, each of the on* handlers can process the events synchronously or asynchronously.

Take this example:

nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput)

It has an async origin and an async destination. Let’s assume that both origin and destination are selector event loops. The Subscription.request(n) must be chained from the destination to the origin. This is now where each implementation can choose how to do this.

The following uses the pipe | character to signal async boundaries (queue and schedule) and R# to represent resources (possibly threads).

nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput)
-------------- R1 ----  | - R2 - | -- R3 --- | ---------- R4 ----------------

In this example each of the 3 consumers, map, filter and consumeTo asynchronously schedule the work. It could be on the same event loop (trampoline), separate threads, whatever.

nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput)
------------------- R1 ----------------- | ---------- R2 ----------------

Here it is only the final step that asynchronously schedules, by adding work to the NioSelectorOutput event loop. The map and filter steps are synchronously performed on the origin thread.

Or another implementation could fuse the operations to the final consumer:

nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput)
--------- R1 ---------- | ------------------ R2 -------------------------

All of these variants are "asynchronous streams". They all have their place and each has different tradeoffs including performance and implementation complexity.

The Reactive Streams contract allows implementations the flexibility to manage resources and scheduling and mix asynchronous and synchronous processing within the bounds of a non-blocking, asynchronous, dynamic push-pull stream.

In order to allow fully asynchronous implementations of all participating API elements—Publisher/Subscription/Subscriber/Processor—all methods defined by these interfaces return void.

Subscriber controlled queue bounds

One of the underlying design principles is that all buffer sizes are to be bounded and these bounds must be known and controlled by the subscribers. These bounds are expressed in terms of element count (which in turn translates to the invocation count of onNext). Any implementation that aims to support infinite streams (especially high output rate streams) needs to enforce bounds all along the way to avoid out-of-memory errors and constrain resource usage in general.

Since back-pressure is mandatory the use of unbounded buffers can be avoided. In general, the only time when a queue might grow without bounds is when the publisher side maintains a higher rate than the subscriber for an extended period of time, but this scenario is handled by backpressure instead.

Queue bounds can be controlled by a subscriber signaling demand for the appropriate number of elements. At any point in time the subscriber knows:

  • the total number of elements requested: P
  • the number of elements that have been processed: N

Then the maximum number of elements that may arrive—until more demand is signaled to the Publisher—is P - N. In the case that the subscriber also knows the number of elements B in its input buffer then this bound can be refined to P - B - N.

These bounds must be respected by a publisher independent of whether the source it represents can be backpressured or not. In the case of sources whose production rate cannot be influenced—for example clock ticks or mouse movement—the publisher must choose to either buffer or drop elements to obey the imposed bounds.

Subscribers signaling a demand for one element after the reception of an element effectively implement a Stop-and-Wait protocol where the demand signal is equivalent to acknowledgement. By providing demand for multiple elements the cost of acknowledgement is amortized. It is worth noting that the subscriber is allowed to signal demand at any point in time, allowing it to avoid unnecessary delays between the publisher and the subscriber (i.e. keeping its input buffer filled without having to wait for full round-trips).

Legal

This project is a collaboration between engineers from Kaazing, Lightbend, Netflix, Pivotal, Red Hat, Twitter and many others. This project is licensed under MIT No Attribution (SPDX: MIT-0).

reactive-streams-jvm's People

Contributors

2m avatar akarnokd avatar angelsanz avatar anthonyvdotbe avatar benjchristensen avatar bjornhamels avatar briantopping avatar colinrgodsey avatar davidmoten avatar douglea avatar egetman avatar jakewharton avatar jroper avatar kiiadi avatar ktoso avatar ldaley avatar olegdokuka avatar ouertani avatar patriknw avatar retronym avatar rkuhn avatar rstoyanchev avatar savulchik avatar scottmitch avatar seratch avatar shenghaiyang avatar smaldini avatar sullis avatar tomislavhofman avatar viktorklang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactive-streams-jvm's Issues

OnSubscribe vs return Subscription

The mechanism for receiving a Subscription is worth discussion as there are differing views on this.

Currently it is passed back from the Publisher to the Subscriber:

public interface Publisher<T> {
    public void subscribe(Subscriber<T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
}

It could however be like this:

public interface Publisher<T> {
    public Subscription subscribe(Subscriber<T> s);
}

Or as shown here it could be:

public interface Publisher<T> {
    public Subscription<T> subscribe();
} 

public interface Subscription <T> {
    public void request(Subscriber<T> reader, int n);
    public void close();
}

Temporarily Remove TCK Implementation

I propose temporarily removing the TCK implementation and that we stop working on it until we arrive at a fairly stable agreement on the API and contract.

There is significant discussion going on in #19 and #37 that is changing the contract, and I expect further changes beyond these. The TCK impedes rapid iteration and confuses things until the contract is clearly defined.

Do we have agreement to delete what's currently there (and about to be obsolete) until we reach an agreement on contract?

Is a Subscriber required to be thread-safe?

While implementing some samples, I started thinking about this requirement. The README says that publishers MUST not send concurrent notifications, however is the Subscriber required to be thread-safe in regards to visibility concerns?

For example, the subscription coming in onSubscription must be stored in a reference. Does this reference need to be a volatile?

Or another example - Subscription.cancel() is async and might not cancel the stream right away. To ensure that onNext does not happen after a Subscription.cancel() one might want to store and use a boolean variable (e.g. isComplete) - and this need for example arises when user code is executed that triggers an exception, thus triggering in the pipeline an onError signal downstream and a cancelation of upstream.

I'm asking because if you build a graph of Publishers/Subscribers (e.g. Rx), then multi-threading concerns can only be addressed at asynchronous boundaries and thus not pay the price for each node in that graph. Like if you build an Rx implementation on top of this specification, operators like map() and filter() are synchronous in nature and wouldn't need synchronization if the upstream publisher would be well behaved.

2.11 Build

While testing out reactive Streams,

I've had troubles with conflicting cross-version suffixes of dependencies
caused by reactive Streams are build against Scala 2.10.

Would it be possible to release a 2.11 Build?

Thank you

Hot vs cold.

Hope this is not the wrong place to post this, but I can't find a forum/group to ask this, so here goes:

Since hot vs cold seems to be a source of confusion when dealing with streams, and since such implementations differ, would it not make sense to distinguish them in the API?

E.g. currently we have the subscribe method, which could be named either start (for cold) and pickUp or resume (for hot).

If I'm completely off here, please help me understand why that's not necessary (or not possible).

Supporting byte streams

I'm concerned that the API doesn't support a very common use case: streams of bytes. (Or, less commonly, chars, ints, bits, etc.) These types all have in common the fact that they are transported not one by one, but in arrays, buffers, strings, etc. And these buffer types don't have size limits as part of the type.

This creates a problem: if a Subscriber[ByteString] calls request(1), it will get one more ByteString - but of unknown size. This conflicts with the basic requirement of back pressure. Imagine a stream with an 'unzip' processor; a naive implementation would create zip bombs.

The programmer could manually configure all components in a reactive stream pipeline to emit arrays within min-max size limits. But this would mean hand-tuning for performance (since optimal buffer size varies with component), instead of relying on automatic back pressure communication.

Worse, if the programmer doesn't control both ends of a channel, he won't be able to rely on the behavior of the other side - which might be written using a different Reactive Streams implementation, a different language, or be across a network. This also limits the usability of a language-specific custom type like a size-limited ByteString.

I think this usecase will be common, so I'm suggesting it should be addressed in the API. What do you think?

Multicast Requirement

Currently the spec states "A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them)."

I think this is a mistake to complicate the spec and require implementations to support multicasting and therefore management of subscriptions over time. In fact, I think it should expressly stick to unicast (new lifecycle per Subscription).

Multicasting techniques should be layered on top by libraries, not required of the Publisher instances themselves.

For example, each Subscriber could result in a new network connection, open file, etc. This would be a basic implementation.

In Rx this greatly simplifies things and is a good separation of concern. Multicasting can be added and done with different behaviors such as replaying all of history, a portion of history, the last value only, or ignoring history and just starting from now onwards, etc.

In other words, someone providing me a Publisher should not concern themselves with multiple subscriptions, how I want to multicast or other such things. If I subscribe it should start a new lifecycle, emit the data as I request it and stop when I unsubscribe.

This keeps the mental model clear, the Publisher implementations simple and allows libraries a simple contract to interface with.

Questions on API contract regarding onComplete/onError

Unfortunately the specification is not up-to-date, I haven't been following the whole discussion and the examples given are incomplete, so in trying to implement simple publishers / subscribers, the following questions came to mind:

  1. can a Publisher emit onComplete / onError without being asked ... as in, when the Subscriber does Subscription.request(n), does this n count include the ensuing onComplete / onError?

Example:

// given a subscriber that does this:
subscription.request(1);

// can a publisher do this in response to the above or does it have 
// to wait for another request(1) for triggering onComplete?
subscriber.onNext(something);
subscriber.onComplete();
  1. Error handling - I don't remember too many mentions on how errors should be handled, so what happens if an onNext(elem) fails? Should the publisher's logic catch it and trigger an onError(ex) event? In the Rx design guidelines, this is said to trigger weird effects.

Release 0.4.0.M2

I propose to cut the next milestone after the following PRs/Issues are resolved:

#62 (potentially: #82)
#80
#81
#63

What say you @reactive-streams/contributors

TCK Upgrade Story

This issue came up during discussions on the "bring back the TCK" pull request: #91

In general we need some story about when we change the spec, and the tests, that implementors will be notified about some changed (perhaps optional!) behaviour. In the TCK tests which are optional are skipped if they fail to verify currently, this could cause adding optional behaviour in 0.x+1 simply be skipped to be verified, and the implementor may assume "oh good, my impl covers everything". This is not really limited to adding optional tests, but in general behaviour which we can't test for example too.

This issue means to start a discussion about if we want to provide more info for implementors in the TCK itself or simply assume implementors will read changelogs and apply all changes.

One idea on the more extreme side would be that each spec would contain a value conformsTo() { return Spec_0_4; }), then if the dependency is updated to 0.5, we fail the tests listing what has changed... This might be overkill, and simply "please read the change logs" could be our solution here.

I'm interested if you guys see this as a problem that needs solving or we simply document that "if you upgrade, please do read the changelog in detail".

Consumer.getSubscriber?

If Subscriber is considered part of the SPI and never intended for use by the user, why is it exposed on Consumer?

public Subscriber<T> getSubscriber();

In particular, what would be the point of ever retrieving it? There are no methods of use to invoke from a consumer perspective. In fact, if any are invoked they are now altering the data flow.

The javadoc states "This method should only be used by implementations of this API."

Since an implementation would have access to the concrete types and capable of getting this anyways it seems that it should not be on the Consumer type where it will confuse and allow incorrect behavior.

Remove Processor Interface

What is the purpose of Processor being in the API?

Based on my reading of the code and understanding of the intents of this API, I suggest removing the Processor class: https://github.com/reactive-streams/reactive-streams/blob/master/spi/src/main/java/org/reactivestreams/api/Processor.java

It does not seem relevant to the purpose of this project. Libraries such as Akka, RxJava, Reactor, Finagle etc can and will layer logic on top of the streams of data and integrate via the Subscriber class to receive and transform data.

define what “heavy computation” means for the spec

The consensus on updating the asynchronicity semantics of the Subscriber–Subscription relationship established in #46 involves the term “heavy computation” as describing some action that SHOULD NOT (as per RFC2119) be performed within callbacks (like onNext and request). We need to properly define what we mean by this classification and then incorporate that (and the reasoning behind) in the specification.

Support Requesting Infinite (firehose)

I suggest modifying the spec to allow signaling desire for unrestricted emission of data.

This was brought up at https://github.com/reactive-streams/reactive-streams/pull/61/files#r13451851

Currently the spec permits only positive numbers passed to request(n). I suggest accepting a sentinel such as request(-1) to mean "send as much as you want as fast as you want".

The two use cases I'm aware of where this would be valuable are:

1) Time Based Processors

If a Processor is doing sampling, throttling, windowing, debouncing or other such time based work then calling request(n) doesn't make sense. It wants to receive everything without any artificial delays.

2) Synchronous Processing

If my Subscriber is processing synchronously (natural backpressure, not blocking threads, just CPU-bound) then there is no need for the request(n) back-and-forth and overhead of tracking this counter.

In both of the above cases the current spec requires doing something like request(Integer.MAX_VALUE) and decrementing on every onNext and invoking request again occasionally. This adds unnecessary overhead. It also means that the Publisher/Subscription has no hint that it could optimize and just firehose the data. This affects the implementation and a Publisher can have a "fast-path" for cases where request(-1) is invoked that doesn't involve queuing, pausing or bookkeeping at all.

Contract/Usage for Subscription.requestMore

Reading the spec published it's very unclear to me in what way Subscription.requestMore should be used.

For example under the paragraph on Subscriber.onNext it says that "must not call any methods on the Subscription".

Is that a typo? Why does onSubscribe exist if not for passing a reference such that the Subscriber can use it? I assume you didn't mean to say that :-)

Also, the Publisher is definitely expecting requestMore(nr) to start producing elements. Can't find the paragraph that mentions that. So a well behaved Subscriber should requestMore.

Also, what happens if for certain Publisher implementations / combinators / whatever, this mechanism isn't possible? This means that the Subscriber should use requestMore, but shouldn't count on it to actually work, right?

This gets really problematic when you want to define some operator that turns one or more Publishers[T] into a Publisher[U]. I mean, when implementing say zip, if you could rely on the source publishers being well behaved in regards to back-pressure actually working, then buffering multiple elements wouldn't be needed, however I don't think you can necessarily assume that - so something along the chain of transformations needs to either drop or buffer the incoming surplus events.

Replace build.sbt

Source files have been translated from Scala to Java ones but the build tool remains related to scala sbt.

I propose to change and use the old maven build tool.

Subscription.requestMore

Any particular reason for the "more" in requestMore?

public void requestMore(int elements);

It seems redundant to have "more", or non-applicable on the first invocation of it when nothing has yet been requested hence "more" is wrong.

Perhaps it's cleaner to just say:

public void request(int elements);

clarify meaning of “overflow protection”

@benjchristensen proposed

  • A Subscription MUST support an infinite number of calls to request and MUST support a pending request count up to 2^63-1 (java.lang.Long.MAX_VALUE). If more than 2^63-1 are requested in pending then Subscription MUST emit an error to Subscriber.onError.

Release 1.0.RC1

What needs to happen in order for us to be able to ship a release candidate of Reactive Streams 1.0?

Thoughts:

  1. We first have to release #83
  2. We need to port/create the TCK
  3. We need to update and elaborate on the documentation
  4. We need to have 1, preferably more, implementations ready who pass the TCK so we know we're coving most/all the bases with the spec

@reactive-streams/contributors please add thoughts as comments

TCK support for restricted length Producers and Consumers

I know the 0.3 TCK is out of date and a new TCK will be written at some point. Please consider this a feature request any new TCK.

The 0.3 TCK has support for verifying Producers and Consumers. I'll talk about Producer testing here, but the same idea applies to Consumer testing too.

Users of the ProducerVerificationTest need to implement createPublisher(elements: Int) and construct a Publisher of length elements (with an argument of 0 meaning a stream of infinite length).

In my situation I wasn't able to satisfy the requirements of this method because the Producer I was creating only supported streams with exactly 1 element.

What I'd like to suggest is that, in any future TCK, there are options to specify the valid lengths of the stream that is being tested. For example, some streams might support only 0, 1 or finite lengths.

Polyglot Support

I suggest expanding this initiative beyond the JVM since most of us need our data streams and systems to interact over network boundaries with other languages.

Thus, it seems it's actually more important to define the protocol and contract and then allow each language platform to define the interfaces that meet it.

Perhaps an approach to this is breaking out into multiple sub projects such as:

  • /reactive-streams/specification (purely for contract definition)
  • /reactive-streams/reactive-streams-jvm
  • /reactive-streams/reactive-streams-dotnet
  • /reactive-streams/reactive-streams-javascript
  • /reactive-streams/reactive-streams-websockets
  • /reactive-streams/reactive-streams-tcp
  • /reactive-streams/reactive-streams-udp

Even if the focus in the short-term remains on the JVM interface design, we would gain a lot by including communities such as Javascript/Node.js, Erlang, .Net, banking and financial trading (who have been doing high performance messaging for decades). It would also make the model far more useful as we could then consume a reactive stream from Javascript in a browser via WebSockets to powered by Netty or Node.js receiving data from Rx/Akka/Reactor/whatever and it would "just work".

remove unnecessary Processor rules

4.2 (cancel upstream after last downstream has canceled) and footnote 1 (must be prepared to start receiving before there are downstream subscribers) are not needed as the behavior is sufficiently specified by the provisions of 4.1 and 4.3.

add provisions for failing `onError`

discussion was:

@benjchristensen 2.13

We say nothing about onError failing and throwing an exception. We should add something like:

A failing onError (e.g. throwing an exception) is a specification violation and the Publisher or Subscription MUST throw (or wrap and re-throw) the thrown exception.

@smaldini

Well that could be a not but it didn't enforce anything so I didn't think about leaving it. WDYT ? Note or Rule ?

@benjchristensen

I think it's better to call this one out explicitly as it is important for people to have clarity on how the edge cases of error handling should be handled. In other words, I see it as proactively avoiding the FAQ about what "the right thing to do" is for this case.

A note would be fine, but since we already have 1.13, it feels natural in reading order to have the very next one explains what to do if onError fails, thus I'd suggest putting it as a rule ... it just happens to be a rule that you achieve naturally as long as you don't do anything actively wrong (swallowing errors for example).

@rkuhn

We need to define what it means for the Publisher–Subscriber (and possibly Subscription if one was created) for onError to throw, but defining the part of the behavior that is external to this relationship cannot meaningfully be covered by this spec—requiring to re-throw is only valid in certain scenarios, other implementations might have different mechanisms in place to deal with this. My proposal is:

A failing onError invocation is a violation of the specification. In this case the Publisher MUST consider a possible Subscription for this Subscriber as canceled. The Publisher SHOULD raise this error condition in a fashion that is adequate for the runtime environment (e.g. by throwing an exception, notifying a supervisor, logging, etc.).

Use of SBT for Builds

Anyone else having issues with SBT? I am spending more time fighting SBT than building.

Since this is a Java project, any reason to not stick with Maven or Gradle?

Clarification of “Asynchronous Boundary”

Introduction

It was from the very beginning that we agreed upon the following basic problem as the prime motivator for the Reactive Streams project: “How do we pass a stream of data items across an asynchronous boundary without requiring lossiness, unbounded buffering or blocking?”

The first two items come from the fact that without the ability to express back pressure the recipient has to either buffer or drop items (since it is impossible to guarantee that the recipient will always be faster than the sender—given that the scenario involves an asynchronous boundary).

The last item comes from the desire to keep both parties of the exchange responsive to external inputs during the whole process. Image you want to implement a stream processing component that does conflation (e.g. taking in stock ticks and handing out the most up-to-date prices to potentially slower consumers). If signaling the consumers would block, then the ticks arriving in the meantime could not be handled, conflation could not be implemented.

These are to my understanding the points that we all agree upon already. We probably are in agreement about the following, but rather than assume I would like to confirm this understanding.

Problem Statement

Asynchronous boundaries imply a certain cost—as witnessed by this project—and therefore should only be applied when needed. The crucial question is what the reasons are that are driving the need for asynchrony, and the answer boils down to variations of a common theme: asynchrony is needed for decoupling two components.

Components that interact synchronously cannot meaningfully be decoupled, consider two processes where one asks the other for something and synchronously expects the answer back. If the second process does not provide an answer, then the first process is broken (might hang indefinitely or fail surprisingly). In systems that are built with responsiveness in mind, there is only one scenario in which the answer remains outstanding: the second process has failed unexpectedly. Synchronous interaction then leads to cascading failures across component boundaries, which means that the components are not properly encapsulated and isolated from each other.

Decoupling two components does not only mean stopping failures from propagating in this direct fashion, it also is crucial in separating their resource usage. The trivial example is running different parts of an application on different machines, allocating specific resources to them. This introduces asynchrony by virtue of requiring network interaction, and we all agree that papering over that by simply blocking until the network comes back with a reply is unacceptable (including for the reason given in the previous paragraph). Staying in this picture, we have component A and component B for which there are distinct resource allocations (e.g. separate machines), and now our mission statement reads “how do we pass a stream of data items from A to B without requiring A to block or B to drop or buffer unboundedly?”.

This very same problem applies whether A and B are living inside the same JVM or not, since we can run them on different thread pools which we could (using native libraries) even pin to different CPU sockets, purposefully giving them fairly well separated resources (I would love to be able to separate memory allocations as well, but that is not needed to make this point). The reason why we do this is to encapsulate and isolate the two components such that misbehavior in A does not lead to failure in B and the other way around. In other words, decoupling A and B means removing their ability to interfere with each other down to the minimum—the actual data items passed across the boundary between them.

Conclusion

With this argument we are in a position to clarify that the purpose of an asynchronous boundary in the context of Reactive Streams is that the sender and the recipient of the data items that flow across this boundary are clearly separated in the resources that they use, which is a necessary prerequisite for decoupling. This is an important goal for complex systems, because only through decoupling can we keep the maintenance burden manageable.

Consequences

It is my understanding that the purpose of the Reactive Streams project is to govern the signaling of data and demand between decoupled components, and we agreed on several occasions that it deliberately does not concern itself with how the streams are processed within these components. Therefore it is imperative that we are very clear about this in the specification: all processing of the data items must occur inside the components, which means on the resources that are allocated to either of them.

This translates into the requirement that the specified methods (in particular onNext, onError, onComplete and request as the main data carriers) must be treated exclusively as signals to be sent across the asynchronous boundary, and no processing can happen while they are on the call stack.

Another consequence of the reasoning above is that if we want to solve the problem of decoupling components then the resulting specification will have to be limited to this scope. If we start adding behavior which is orthogonal to this goal, but opens up the specification in a way that breaks down when requiring isolation of sender and recipient, then we will not succeed. We should avoid the trap of trying to solve everything.

This means that the intention of making it easy to implement one side of the boundary (as has been suggested for the Subscriber end) cannot really work, since the requirement of decoupling sender and receiver implies that even a functionally simple transformation needs to have access to an asynchronous execution facility—I say functionally simple because for example a filter expression that is applied to items in onNext can consume arbitrary resources, and only excluding IO as a special form of blocking is not good enough.

Closing Remarks

This basic problem statement is the core upon which everything else is built, so we need to be very clear about it and have a complete common understanding in order to proceed. I expect that we will continue to have discussions about the precise consequences, but if we do not agree on what it is that we want to be building, then these discussions will not lead to consensus.

Move communication from publisher to subscriber into subscription.

I like how the communication of the subscriber to it's publisher goes through the subscription, but why doesn't the communication in the other way go through that as well?

I guess it would require an additional (potentially) async confirmation from the subscriber to the publisher that it's received the subscription, and you couldn't signal errors to the subscriber if the subscription couldn't be created, but the advantage of that is that you'd be sure the only object that could call the subscriber callbacks is the publisher.

As it is now any code anywhere can send events to a subscriber, regardless of subscriptions.

Think of what was discussed in #53:

  • a Subscriber can assume single-threaded behavior and does not need to worry about volatile or other thread-safety issues
  • a Publisher must ensure thread-safe, sequential notification to a Subscriber whether it's on a single thread, or multiple threads. If from multiple threads, then it must serialize/synchronize to ensure non-concurrent notification, and thus it also satisfies the visibility concerns.

Libraries conforming to the spec would all have to add notes to their subscribers explaining a user should not call those methods unless they can serialize the calls with other publishers.

Release version 0.4.0

We're currently blocked waiting on the issues being discussed around "heaviness" (#54) and the onComplete/onError API contract to be settled before we can move forward with our Reactive Streams implementation which is currently dependent on version 0.3.0 artifacts.

Are we in a position yet where we can consolidate the changes we discussed into a new 0.4.0 artifact so we can move forward with our implementation? We have other projects that will need to use our Reactive Streams implementation and they are essentially waiting on the outcome of these discussions. The longer this drags out in discussions the behinder we get. :)

We seem to be in general agreement on the vast majority of items, with some differences of opinion in some of the finer points. But I don't see anything holding us up from moving forward with a TCK or whatever we plan to release for 0.4.0.

Thoughts?

Naming

This will be the debate of all debates. What to name everything.

At the time of creating this issue the names are:

public interface Publisher<T> {
    public void subscribe(Subscriber<T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(int n);
    public void cancel();
}

Another proposal from @mariusaeriksen is:

public interface Source<T> {
    public Handle<T> open();
}

public interface Handle<T> {
    public void read(Reader<T> reader, int n);
    public void close();
}

public interface Reader<T> {
    public void onNext(T t);
    public void onCompleted();
    public void onError(Throwable t);
}

Common conventions include:

  • Publisher/Subscriber
  • Observable/Observer
  • Source/Sink
  • Listenable/Listener
  • Stream/Subscriber
  • Channel
  • Handler

The one principle I want to keep in mind is that if we are successful with this we will be looking at and using these types for a very long time, particularly if they end up in a future JDK.

Subscription.cancel

The current interface on Subscription includes

public void cancel();

The semantics of cancel suggests it is actually doing something to "cancel" work, when in reality that is unlikely. I suggest unsubscribe is semantically more correct since we are just removing the Subscription to stop receiving updates. Whether that results in any actions beyond stopping the flow of data is up to the implementation (for example, you can't cancel mouse events or stock prices, you just unsubscribe from receiving further notifications).

The use of unsubscribe better mirrors the subscribe counterpart as well:

public interface Publisher<T> {
  public void subscribe(Subscriber<T> subscriber);
}

public interface Subscription {
  public void unsubscribe();
}

compact 1.14–17

discussion was:

@rkuhn on 1.14

Please move the parenthesis out into a footnote and add more cases in which Subscribers may be rejected: when the Publisher cannot produce any more elements (exhausted a non-restartable source, shut down its resources; in general covering the cases from rules 15–17).

@benjchristensen on 1.15

How is a publisher in a "completed" state? A Subscription may be (it has already emitted a terminal event) and a Subscriber definitely can be, but a Publisher doesn't have such a state.

I suggest removing this point and just leaving 1.14 to cover this as a Publisher can onError if it wants to, but that has nothing to do with being in a "completed" state.

@viktorklang

A hot publisher can most definitely be in a completed state?

@benjchristensen

Then it would just onError when someone tries to subscribe. Nothing about the Publisher requires formalized states.

Item 1.14 covers everything that is needed for the use case you give. It is confusing to consider Publisher states when we support both hot and cold implementations.

@smaldini

I think the behavior stays covered in 1.14 as highlighted, Maybe we could move this into a note for reference "Stateful Publisher".

@benjchristensen

I'm fine with a "Stateful Publisher" section of notes if that is helpful.

@rkuhn

Agree with @benjchristensen here: the reasons why a Publisher wants to not be woken up again can be manifold, we don’t need to express one rule for each of them. My proposed solution is to remove 15–17 and move them into the footnote suggested for 14 above.

change “MUST reject” to RECOMMENDED in 1.12

discussion was:

@benjchristensen

Why is this up to the Publisher to enforce this? That requires bookkeeping in an aggregated data structure whereas normally a unicast Publisher/Subscription will only ever know about the single Subscriber it is currently associated with. The Publisher shouldn't have to track every Subscriber it ever saw and then deal with the cleanup. This adds concurrency and memory overhead.

I would change "It MUST reject the Subscription" to "It MAY reject the Subscription" and have something under the Subscriber in section 2 that states that it can't subscribe more than once with a single Subscriber instance.

Updated ... section 2.5 already addresses this requirement.

@viktorklang

I'm not sure I understand your concern, there needs to be a link between publisher and subscriber anyway, via the subscription?

@benjchristensen

No there isn't. There is a link between the Subscriber and Subscription. There is no requirement for the Publisher to maintain any reference at all to either of those instances after the Publisher.subscribe call.

This simple example demonstrates how the Publisher is just a factory and never retains a reference to the Subscriber or Subscription: https://github.com/reactive-streams/reactive-streams/blob/master/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java
This requirement would force the Publisher to maintain a data structure with references to every Subscriber that has not yet unsubscribed. That is unnecessary overhead and complexity.

@rkuhn

While there certainly are Publishers with the characteristics you describe, this is not in general true: most “hot” sources will have knowledge of whom to produce to in a central place already.

That being said, your point is valid that requiring this behavior of all Publishers is too limiting. What about changing the MUST into a RECOMMENDED, meaning that the Publisher should inform the caller of subscribe that they are doing something wrong if possible; we are regulating a spec violation in any case.

Metrics and SLA

Another areas that can be added is Metrics and SLA. Since a system like this would be fast as the slower components, different implementations could have an impedance mismatch.

Also a way to collect Metrics against the SLA to see if there are violations of the SLA. The Metrics can also be implemented as a stream to which interested consumers can subscribe to. Metrics for a stream should have it validated against its own SLA, endpoint SLA, and additional SLA configured.

Having a uniform way to collect Metrics and SLA would be big value added perhaps may be as an extension.

Not all components need to implement this as this can be option and targeted for implementations targeting performance / speed / efficiency sensitive systems.

Since collecting Metrics can sometimes take cycles there should be an implementation of the component which does not take the cycles.

Producer.produceTo and Consumer

What is the reason for the API not using the same subscription model as the SPI? I believe it's about trying to allow libraries to be as generic as possible, but it seems to have gone to an extreme that loses all point of having types since these communicate little to nothing (particularly the empty Consumer type).

The Producer.produceTo(consumer) signature does not communicate lifecycle at all (since Consumer has no methods on it) and this is a critical aspect of a stream since terminal states (onComplete and onError) can and will occur.

If the intent is to have Producer/Consumer types exposed to library users, at this point they are so generic as to be useless in communicating any intent. Thus, each library will need to hide these behind their own interfaces and they become little different than the SPI.

In short, what's the point of the API types (Consumer and Producer) in their current form?

Routing Mechanism for Systems with Large Number of Streams (Producers / Consumers) with Selective Interest in Events

Hi,

Good to see an effort to make similar products interoperate. One aspect that so far has not been addressed is routing.

E.g. you have n produces and m consumers for whom selectively would want events from a consumer it might be an added overhead to have n * m streams. Also if there are n nodes interested in what other nodes produce but only selective events, you will need n! streams. In such cases intermediate router / switch can come into play. The router will maintain n + m (vs. n * m) streams in the 1st case and n (vs n!) streams in the 2nd case.

What the router needs to do is:

  1. When to receive data from a given producer streams (stream with a producer at the other end) it manages and which consumer streams (consumer at the other end) to send the data to
  2. Since it needs to colace data from different producers and this data may be out of order and may have redundant events which need to removed / replaced / delayed / prioritized

The issues that need to be taken care of is:

  1. Ensure that the right events taken from and send to each streams
  2. Ensure the right number of events are taken from and send to each stream
  3. Ensure all events are sent unless there is dropping or filtration scheme in play
  4. Handle routing and propagation of errors in an appropriate manner, i.e., consumer notified an issue with a value then it needs to be routed to the producer if such handling is implemented in the stream

Suminda

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.