Comments (17)
What about the bounded equivalent : EmitterProcessor
then it's up to SignalEmitter to drop or not like how you're doing :
EmitterProcessor<Float> randomNumberGenerator = EmitterProcessor.create()
randomNumberGenerator.log().subscribeOn(Computations.concurrent()).publishOn(Computations.concurrent()).subscribe();
SignalEmitter<Float> emitter = randomNumberGenerator.connectEmitter();
SecureRandom sr = new SecureRandom();
int i = 1;
while(true){
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
Emission emission = emitter.emit(sr.nextFloat());
}
from reactor-core.
Also note that unlike Schedulers equivalent in RxJava, we don't have static registries of Schedulers and Computations only create a specific kind of Scheduler with a new thread pool. You should share a ref to it and leave the operators use createWorker
when a Subscriber
subscribes.
from reactor-core.
Note that yield
is now called create
.
from reactor-core.
@smaldini I take note of the correct way of using Computations
.
Regarding the use of EmitterProcessor
, the sample code doesn't compile: randomNumberGenerator.log()...
can never be reached as is located after a while(true)
. Shouldn't that loop be placed as a callback somewhere else?
from reactor-core.
Edited indeed :
from reactor-core.
I'm afraid I still don't see the whole picture :-(
What I am missing is how to subscribe some subscribers to the randomNumberGenerator
.
On the one hand I have:
public class Generator{
private EmitterProcessor<Float> randomNumberGenerator;
public void start(){
randomNumberGenerator = EmitterProcessor.create();
randomNumberGenerator.log().subscribeOn(Computations.concurrent()).publishOn(Computations.concurrent()).subscribe();
SignalEmitter<Float> emitter = randomNumberGenerator.connectEmitter();
SecureRandom sr = new SecureRandom();
int i = 1;
while(true){
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
Emission emission = emitter.emit(sr.nextFloat());
}
}
public EmitterProcessor<Float> getRandomNumberGenerator() {
return randomNumberGenerator;
}
}
On the other:
...
Generator generator = new Generator();
generator.start();
Thread.sleep(6000);
System.out.println("WAKE UP");
RnApp app = new RnApp("APP");
RnApp xxx = new RnApp("XXX");
generator.getRandomNumberGenerator().subscribe(app);
generator.getRandomNumberGenerator().subscribe(xxx);
Thread.sleep(6000);
System.out.println("WAKE UP 2");
app.request(5);
xxx.request(5);
Thread.sleep(30000);
System.out.println("WAKE UP 3");
app.request(5);
xxx.request(5);
latch.await();
}
The problem here is that generator.start()
blocks the thread so RnApp's never subscribe.
This is how I did something similar in RxJava:
public void testRealTimeSubject() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1);
ConnectableObservable<Integer> observable = Observable.range(1, 1000)
.doOnCompleted(() -> {
latch.countDown();
})
.subscribeOn(Schedulers.io())
.publish();
PublishSubject<Integer> realTimeSubject = PublishSubject.<Integer>create();
SensorReaderObserver sensor1 = new SensorReaderObserver();
SensorReaderObserver sensor2 = new SensorReaderObserver();
observable.subscribe(sensor1);
observable.subscribe(realTimeSubject);
observable.connect();
Thread.sleep(40);
realTimeSubject.subscribe(sensor2);
latch.await();
Assert.assertEquals(sensor1.getValues().size(), 1000);
Assert.assertNotEquals(sensor2.getValues().size(), 1000);
}
from reactor-core.
The problem here is that generator.start() blocks the thread so RnApp's never subscribe.
Your code in the start()
method:
while(true){
...
}
Indeed, it never will exit!
Invoke that infinite loop from the different Thread:
Computations.single().schedule(() -> {
while(true){
...
}
});
from reactor-core.
@artembilan thanks, I just didn't know where to put that loop so that it didn't block the main thread. With the following modifications both RunApp instances receive the messages but, they keep receiving prefreched values:
Generator class:
public class Generator{
private EmitterProcessor<Float> randomNumberGenerator;
private Scheduler concurrent = Computations.concurrent();
public void start(){
randomNumberGenerator = EmitterProcessor.create();
randomNumberGenerator.log().subscribeOn(concurrent).publishOn(concurrent).subscribe();
Computations.single().schedule( () -> {
SignalEmitter<Float> emitter = randomNumberGenerator.connectEmitter();
SecureRandom sr = new SecureRandom();
int i = 1;
while(true){
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
Emission emission = emitter.emit(sr.nextFloat());
}
});
}
public EmitterProcessor<Float> getRandomNumberGenerator() {
return randomNumberGenerator;
}
}
Test:
Generator generator = new Generator();
generator.start();
Thread.sleep(6000);
System.out.println("WAKE UP");
RnApp app = new RnApp("APP");
RnApp xxx = new RnApp("XXX");
generator.getRandomNumberGenerator().subscribe(app);
generator.getRandomNumberGenerator().subscribe(xxx);
Thread.sleep(6000);
System.out.println("WAKE UP 2");
app.request(5);
xxx.request(5);
Thread.sleep(30000);
System.out.println("WAKE UP 3");
app.request(5);
xxx.request(5);
latch.await();
As you can see in the following logs, everything that is emitted between the subscribe()
call and the request()
call is cached and delivered later so still it doesn't behave like a PublishSubscriber:
...
08:58:09.790 [single-1] INFO reactor.core.publisher.FluxLog - onNext(0.3271616)
WAKE UP
08:58:10.790 [single-1] INFO reactor.core.publisher.FluxLog - onNext(0.24437588)
08:58:11.791 [single-1] INFO reactor.core.publisher.FluxLog - onNext(0.10515916)
08:58:12.791 [single-1] INFO reactor.core.publisher.FluxLog - onNext(0.9924408)
08:58:13.792 [single-1] INFO reactor.core.publisher.FluxLog - onNext(0.63165975)
08:58:14.793 [single-1] INFO reactor.core.publisher.FluxLog - onNext(0.6097309)
08:58:15.793 [single-1] INFO reactor.core.publisher.FluxLog - onNext(0.46791637)
WAKE UP 2
main-APP got ------> 0.24437588
main-APP got ------> 0.10515916
main-APP got ------> 0.9924408
...
main-XXX got ------> 0.10515916
main-XXX got ------> 0.9924408
from reactor-core.
Not sure where is a problem with your code, but for me it works as expected:
@Test
public void testPublishSubscribe() throws InterruptedException {
Scheduler concurrent = Computations.concurrent();
EmitterProcessor<Long> timeGenerator = EmitterProcessor.create();
timeGenerator
.subscribeOn(concurrent)
.publishOn(concurrent)
.subscribe(v -> System.out.println("0: " + v));
SignalEmitter<Long> emitter = timeGenerator.connectEmitter();
Computations.single().schedule(() -> {
while (true) {
try {
Thread.sleep(100);
}
catch (Exception e) {
e.printStackTrace();
}
System.out.println("NEW VALUE");
emitter.emit(System.currentTimeMillis());
}
});
Thread.sleep(100);
System.out.println("WAKE UP");
timeGenerator.subscribe(v -> System.out.println("1: " + v));
timeGenerator.subscribe(v -> System.out.println("2: " + v));
Thread.sleep(100);
System.out.println("ONE MORE SUBSCRIBER");
timeGenerator.subscribe(v -> System.out.println("3: " + v));
Thread.sleep(2000);
}
The logs look like:
NEW VALUE
0: 1462472571887
NEW VALUE
0: 1462472571988
NEW VALUE
0: 1462472572088
NEW VALUE
0: 1462472572217
WAKE UP
NEW VALUE
1: 1462472572317
2: 1462472572317
0: 1462472572317
NEW VALUE
1: 1462472572417
0: 1462472572417
2: 1462472572417
NEW VALUE
0: 1462472572523
1: 1462472572523
2: 1462472572523
NEW VALUE
1: 1462472572625
2: 1462472572625
0: 1462472572625
NEW VALUE
1: 1462472572725
0: 1462472572725
2: 1462472572725
ONE MORE SUBSCRIBER
NEW VALUE
1: 1462472572825
0: 1462472572825
2: 1462472572825
3: 1462472572825
NEW VALUE
1: 1462472572925
0: 1462472572925
2: 1462472572925
3: 1462472572925
So, I really see a new value on each emission and all new subscribers don't receive any old (cached ?) values. Plus all subscribers received the same value.
IMO the PublishSubject
goal is achieved.
from reactor-core.
Just added a buffer-less Processor called DirectProcessor
see 9b381c8. @akarnokd can confirm but I think it's the direct equivalent.
So this works with unbounded demand and what you are seeing is correct for the RS semantics as subscribers will see all event from the time on they have subscribed. It will not be dropping automatically when demand is not enough.
from reactor-core.
@artembilan You subscribe with a consumer that prints every value that is emitted since the moment of the subscription:
timeGenerator.subscribe(v -> System.out.println("1: " + v));
timeGenerator.subscribe(v -> System.out.println("2: " + v));
In my case I subscribe with a Subscriber:
public class RnApp implements Subscriber<Float>{
...
@Override
public void onNext(Float f) {
System.out.println(Thread.currentThread().getName()+"-"+name+ " got ------> "+f);
}
}
...
RnApp app = new RnApp("APP");
RnApp xxx = new RnApp("XXX");
generator.getRandomNumberGenerator().subscribe(app);
generator.getRandomNumberGenerator().subscribe(xxx);
Thread.sleep(6000);
System.out.println("WAKE UP 2");
app.request(5);
xxx.request(5);
@smaldini What got me mixed up is that I was expecting that the subscriber would get the values from the moment that it invoked request(n)
. However, as you say, every value emitted between the moment of the subscription and the invocation of request(n)
is buffered and delivered when the subscriber actually calls request(n)
. Talking from the complete ignorance, couldn't this be a problem? I mean, somewhere there's a buffer of values that won't be delivered until request(n)
invocation. What if it is never called?
from reactor-core.
@codependent I think that's the same problem hit by operators such as cache etc and the classic solution is usually to attach a simple onBackpressureDrop()
right after. The emitter is bounded so if you onNext to it directly, first you need to know what you're doing vs using connectEmitter() and the emit methods. Then the onNext will spin-wait if the buffer is full until one room frees up downstream. It's still better than uncontroller buffer grow.
Now you want to deal with individual "suspect" behavior from a subscriber perspective not the processor itself, e.g. : timeout()
, onBackpressureDrop
... You can even wrap the chain as single Processor if you need it :
FluxProcessor<X, X> droppingProcessor = FluxProcessor.wrap(emitterProcessor, emitterProcessor.onBackpressureDrop());
Or just return generator.onBackpressureDrop() as a consuming-side Flux
.
from reactor-core.
@smaldini That's it, with the additional FluxProcessor the subscribers won't get buffered values,
Thank you everyone for looking into this.
Final code:
@Test
public void testPublishSubscribe() throws InterruptedException {
Scheduler concurrent = Computations.concurrent();
EmitterProcessor<Float> timeGenerator = EmitterProcessor.create();
timeGenerator
.subscribeOn(concurrent)
.publishOn(concurrent);
SignalEmitter<Float> emitter = timeGenerator.connectEmitter();
Computations.single().schedule(() -> {
SecureRandom sr = new SecureRandom();
while (true) {
try {
Thread.sleep(1000);
}
catch (Exception e) {
e.printStackTrace();
}
float random = sr.nextFloat();
System.out.println("NEW VALUE ---------------"+random);
emitter.emit(random);
}
});
FluxProcessor<Float,Float> droppingProcessor = FluxProcessor.<Float,Float>wrap(timeGenerator, timeGenerator.onBackpressureDrop());
Thread.sleep(4000);
System.out.println("WAKE UP");
RnApp aaa = new RnApp("AAA");
RnApp zzz = new RnApp("ZZZ");
droppingProcessor.subscribe(aaa);
droppingProcessor.subscribe(zzz);
Thread.sleep(4000);
System.out.println("REQUESTING 5");
aaa.request(5);
zzz.request(5);
And the log sequence:
NEW VALUE ---------------0.7953491
NEW VALUE ---------------0.09977782
NEW VALUE ---------------0.15835232
WAKE UP
NEW VALUE ---------------0.28263623
NEW VALUE ---------------0.513252
NEW VALUE ---------------0.8830035
NEW VALUE ---------------0.31092662
REQUESTING 5
NEW VALUE ---------------0.42508847
single-1-AAA got ------> 0.42508847
single-1-ZZZ got ------> 0.42508847
from reactor-core.
@codependent if you intent to drop I advise the new DirectProcessor
, which internally is simpler and buffer-less but fails if downstream demand != Long.Max. Because you use an onBackpressureXxx operator right after we know this is only going to be the case.
from reactor-core.
@smaldini Since the requests won't happen immediately it fails and doesn't recover from it. In the end the subscribers never get anything.
Maybe I'm not using it right:
@Test
public void testPublishSubscribe() throws InterruptedException {
Scheduler concurrent = Computations.concurrent();
DirectProcessor<Float> timeGenerator = DirectProcessor.create();
timeGenerator
.subscribeOn(concurrent)
.publishOn(concurrent);
SignalEmitter<Float> emitter = timeGenerator.connectEmitter();
Computations.single().schedule(() -> {
SecureRandom sr = new SecureRandom();
while (true) {
try {
Thread.sleep(1000);
}
catch (Exception e) {
e.printStackTrace();
}
float random = sr.nextFloat();
System.out.println("NEW VALUE ---------------"+random);
emitter.emit(random);
}
});
Thread.sleep(4000);
System.out.println("WAKE UP");
RnApp aaa = new RnApp("AAA");
RnApp zzz = new RnApp("ZZZ");
timeGenerator.subscribe(aaa);
timeGenerator.subscribe(zzz);
Thread.sleep(4000);
System.out.println("REQUESTING 5");
aaa.request(5);
zzz.request(5);
Thread.sleep(4000);
After the faifure the subscribers request five values but don't get anything:
NEW VALUE ---------------0.6337121
NEW VALUE ---------------0.998461
NEW VALUE ---------------0.6341994
WAKE UP
NEW VALUE ---------------0.2999627
java.lang.IllegalStateException: Can't deliver value due to lack of requests
at reactor.core.publisher.DirectProcessor$DirectProcessorSubscription.onNext(DirectProcessor.java:314)
at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:100)
at reactor.core.subscriber.SubmissionEmitter.emit(SubmissionEmitter.java:150)
at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:258)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:918)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:882)
at reactor.core.publisher.TopicProcessor$TopicSubscriberLoop.run(TopicProcessor.java:877)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
java.lang.IllegalStateException: Can't deliver value due to lack of requests
at reactor.core.publisher.DirectProcessor$DirectProcessorSubscription.onNext(DirectProcessor.java:314)
at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:100)
at reactor.core.subscriber.SubmissionEmitter.emit(SubmissionEmitter.java:150)
at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:258)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:918)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:882)
at reactor.core.publisher.TopicProcessor$TopicSubscriberLoop.run(TopicProcessor.java:877)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
NEW VALUE ---------------0.961773
NEW VALUE ---------------0.08782846
NEW VALUE ---------------0.22994196
REQUESTING 5
NEW VALUE ---------------0.3262939
NEW VALUE ---------------0.14760607
NEW VALUE ---------------0.6409442
NEW VALUE ---------------0.97169876
from reactor-core.
You would still need onBackpressureDrop
as with the other Processor
, that forces request Long.Max on subscribe.
from reactor-core.
Got it, changed from
EmitterProcessor<Float> timeGenerator = EmitterProcessor.create();
to
DirectProcessor<Float> timeGenerator = DirectProcessor.create();
keeping the dropping processor:
FluxProcessor<Float,Float> droppingProcessor = FluxProcessor.<Float,Float>wrap(timeGenerator, timeGenerator.onBackpressureDrop());
Works like a charm :-)
from reactor-core.
Related Issues (20)
- Empty hot source hangs with 2nd late subscriber HOT 4
- Mono.share() allow a stream to be canceled HOT 5
- Flaky test - FluxBlackboxProcessorVerification HOT 6
- Flaky test - DefaultTestSubscriberTest HOT 5
- context lost when using Mono.create with threads HOT 2
- [test] Verify Initialization of Default Labels
- Too difficult to control how much Reactor buffers internally HOT 2
- Enabled Automatic Context Propagation and context propagation with lift causes ClassCastException HOT 10
- [Flaky test] FluxCreateTest.fluxCreateOnRequestMultipleThreadsSlowProducer
- BoundedElasticThreadPerTaskSchedulerTest > ensuresTasksScheduling() FAILED HOT 4
- SinksTest > OptimisticEmitFailureHandlerTest > shouldRetryOptimistically() FAILED
- Add bufferWeightedWithin operator.
- thenMany does not ignore all emissions of a concat due to incorrect optimization HOT 2
- FluxBufferWhenTest > timedOutBuffersDontLeak() FAILED
- Support Considering Individual Element Weight in Determining Buffer Boundary instead of Element Counts
- Javadoc for some versions is missing from the website HOT 3
- Flux.mergeSequential does not subscribe to last Producer in specific circumstances HOT 1
- Failing while building reactor-core version 3.4.18 using ./gradlew build (io.projectreactor:reactor-core:3.4.18) HOT 4
- Fatal exceptions not caught in onErrorDropped Hook HOT 2
- autoConnect(0) seems to be broken - late subscribers receive data
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from reactor-core.