Coder Social home page Coder Social logo

Comments (17)

smaldini avatar smaldini commented on May 22, 2024

What about the bounded equivalent : EmitterProcessor then it's up to SignalEmitter to drop or not like how you're doing :

EmitterProcessor<Float> randomNumberGenerator = EmitterProcessor.create()

randomNumberGenerator.log().subscribeOn(Computations.concurrent()).publishOn(Computations.concurrent()).subscribe();    


SignalEmitter<Float> emitter = randomNumberGenerator.connectEmitter();

SecureRandom sr = new SecureRandom();
int i = 1;
while(true){
     try {
             Thread.sleep(1000);
     } catch (Exception e) {
             e.printStackTrace();
     }
     Emission emission = emitter.emit(sr.nextFloat());
}

from reactor-core.

smaldini avatar smaldini commented on May 22, 2024

Also note that unlike Schedulers equivalent in RxJava, we don't have static registries of Schedulers and Computations only create a specific kind of Scheduler with a new thread pool. You should share a ref to it and leave the operators use createWorker when a Subscriber subscribes.

from reactor-core.

smaldini avatar smaldini commented on May 22, 2024

Note that yield is now called create.

from reactor-core.

codependent avatar codependent commented on May 22, 2024

@smaldini I take note of the correct way of using Computations.

Regarding the use of EmitterProcessor, the sample code doesn't compile: randomNumberGenerator.log()... can never be reached as is located after a while(true). Shouldn't that loop be placed as a callback somewhere else?

from reactor-core.

smaldini avatar smaldini commented on May 22, 2024

Edited indeed :

from reactor-core.

codependent avatar codependent commented on May 22, 2024

I'm afraid I still don't see the whole picture :-(

What I am missing is how to subscribe some subscribers to the randomNumberGenerator.

On the one hand I have:

    public class Generator{

        private EmitterProcessor<Float> randomNumberGenerator;

        public void start(){
            randomNumberGenerator = EmitterProcessor.create();
            randomNumberGenerator.log().subscribeOn(Computations.concurrent()).publishOn(Computations.concurrent()).subscribe();    

            SignalEmitter<Float> emitter = randomNumberGenerator.connectEmitter();
            SecureRandom sr = new SecureRandom();
            int i = 1;
            while(true){
                 try {
                         Thread.sleep(1000);
                 } catch (Exception e) {
                         e.printStackTrace();
                 }
                 Emission emission = emitter.emit(sr.nextFloat());
            }
        }

        public EmitterProcessor<Float> getRandomNumberGenerator() {
            return randomNumberGenerator;
        }
    }

On the other:

               ...
        Generator generator = new Generator();
        generator.start();
        Thread.sleep(6000);
        System.out.println("WAKE UP");
        RnApp app = new RnApp("APP");
        RnApp xxx = new RnApp("XXX");
        generator.getRandomNumberGenerator().subscribe(app);
        generator.getRandomNumberGenerator().subscribe(xxx);
        Thread.sleep(6000);
        System.out.println("WAKE UP 2"); 
        app.request(5);
        xxx.request(5);

        Thread.sleep(30000);
        System.out.println("WAKE UP 3");
        app.request(5);
        xxx.request(5);

        latch.await();
    }

The problem here is that generator.start() blocks the thread so RnApp's never subscribe.

This is how I did something similar in RxJava:

    public void testRealTimeSubject() throws InterruptedException{
        CountDownLatch latch = new CountDownLatch(1);

        ConnectableObservable<Integer> observable = Observable.range(1, 1000)
        .doOnCompleted(() -> {
            latch.countDown();
        })
        .subscribeOn(Schedulers.io())
        .publish();

        PublishSubject<Integer> realTimeSubject = PublishSubject.<Integer>create();

        SensorReaderObserver sensor1 = new SensorReaderObserver();
        SensorReaderObserver sensor2 = new SensorReaderObserver();

        observable.subscribe(sensor1);
        observable.subscribe(realTimeSubject);
        observable.connect();
        Thread.sleep(40);
        realTimeSubject.subscribe(sensor2);

        latch.await();

        Assert.assertEquals(sensor1.getValues().size(), 1000);
        Assert.assertNotEquals(sensor2.getValues().size(), 1000);
    }

from reactor-core.

artembilan avatar artembilan commented on May 22, 2024

The problem here is that generator.start() blocks the thread so RnApp's never subscribe.

Your code in the start() method:

 while(true){
...
}

Indeed, it never will exit!
Invoke that infinite loop from the different Thread:

Computations.single().schedule(() -> {
     while(true){
      ...
     }    
});

from reactor-core.

codependent avatar codependent commented on May 22, 2024

@artembilan thanks, I just didn't know where to put that loop so that it didn't block the main thread. With the following modifications both RunApp instances receive the messages but, they keep receiving prefreched values:

Generator class:

public class Generator{

    private EmitterProcessor<Float> randomNumberGenerator;
    private Scheduler concurrent = Computations.concurrent();

    public void start(){
        randomNumberGenerator = EmitterProcessor.create();
        randomNumberGenerator.log().subscribeOn(concurrent).publishOn(concurrent).subscribe();    

        Computations.single().schedule( () -> {
            SignalEmitter<Float> emitter = randomNumberGenerator.connectEmitter();
            SecureRandom sr = new SecureRandom();
            int i = 1;
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                Emission emission = emitter.emit(sr.nextFloat());
            }
        });
    }

    public EmitterProcessor<Float> getRandomNumberGenerator() {
        return randomNumberGenerator;
    }
}

Test:

Generator generator = new Generator();
generator.start();
Thread.sleep(6000);
System.out.println("WAKE UP");
RnApp app = new RnApp("APP");
RnApp xxx = new RnApp("XXX");
generator.getRandomNumberGenerator().subscribe(app);
generator.getRandomNumberGenerator().subscribe(xxx);
Thread.sleep(6000);
System.out.println("WAKE UP 2"); 
app.request(5);
xxx.request(5);

Thread.sleep(30000);
System.out.println("WAKE UP 3");
app.request(5);
xxx.request(5);

latch.await();

As you can see in the following logs, everything that is emitted between the subscribe() call and the request() call is cached and delivered later so still it doesn't behave like a PublishSubscriber:

...
08:58:09.790 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.3271616)
WAKE UP
08:58:10.790 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.24437588)
08:58:11.791 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.10515916)
08:58:12.791 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.9924408)
08:58:13.792 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.63165975)
08:58:14.793 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.6097309)
08:58:15.793 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.46791637)
WAKE UP 2
main-APP got ------> 0.24437588
main-APP got ------> 0.10515916
main-APP got ------> 0.9924408
...
main-XXX got ------> 0.10515916
main-XXX got ------> 0.9924408

from reactor-core.

artembilan avatar artembilan commented on May 22, 2024

Not sure where is a problem with your code, but for me it works as expected:

    @Test
    public void testPublishSubscribe() throws InterruptedException {
        Scheduler concurrent = Computations.concurrent();

        EmitterProcessor<Long> timeGenerator = EmitterProcessor.create();

        timeGenerator
                .subscribeOn(concurrent)
                .publishOn(concurrent)
                .subscribe(v -> System.out.println("0: " + v));

        SignalEmitter<Long> emitter = timeGenerator.connectEmitter();

        Computations.single().schedule(() -> {

            while (true) {
                try {
                    Thread.sleep(100);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("NEW VALUE");
                emitter.emit(System.currentTimeMillis());
            }

        });

        Thread.sleep(100);

        System.out.println("WAKE UP");

        timeGenerator.subscribe(v -> System.out.println("1: " + v));
        timeGenerator.subscribe(v -> System.out.println("2: " + v));

        Thread.sleep(100);

        System.out.println("ONE MORE SUBSCRIBER");

        timeGenerator.subscribe(v -> System.out.println("3: " + v));

        Thread.sleep(2000);
    }

The logs look like:

NEW VALUE
0: 1462472571887
NEW VALUE
0: 1462472571988
NEW VALUE
0: 1462472572088
NEW VALUE
0: 1462472572217
WAKE UP
NEW VALUE
1: 1462472572317
2: 1462472572317
0: 1462472572317
NEW VALUE
1: 1462472572417
0: 1462472572417
2: 1462472572417
NEW VALUE
0: 1462472572523
1: 1462472572523
2: 1462472572523
NEW VALUE
1: 1462472572625
2: 1462472572625
0: 1462472572625
NEW VALUE
1: 1462472572725
0: 1462472572725
2: 1462472572725
ONE MORE SUBSCRIBER
NEW VALUE
1: 1462472572825
0: 1462472572825
2: 1462472572825
3: 1462472572825
NEW VALUE
1: 1462472572925
0: 1462472572925
2: 1462472572925
3: 1462472572925

So, I really see a new value on each emission and all new subscribers don't receive any old (cached ?) values. Plus all subscribers received the same value.
IMO the PublishSubject goal is achieved.

from reactor-core.

smaldini avatar smaldini commented on May 22, 2024

Just added a buffer-less Processor called DirectProcessor see 9b381c8. @akarnokd can confirm but I think it's the direct equivalent.
So this works with unbounded demand and what you are seeing is correct for the RS semantics as subscribers will see all event from the time on they have subscribed. It will not be dropping automatically when demand is not enough.

from reactor-core.

codependent avatar codependent commented on May 22, 2024

@artembilan You subscribe with a consumer that prints every value that is emitted since the moment of the subscription:

timeGenerator.subscribe(v -> System.out.println("1: " + v));
timeGenerator.subscribe(v -> System.out.println("2: " + v));

In my case I subscribe with a Subscriber:

public class RnApp implements Subscriber<Float>{
    ...
    @Override
    public void onNext(Float f) {
        System.out.println(Thread.currentThread().getName()+"-"+name+ " got ------> "+f);
    }
}
...
RnApp app = new RnApp("APP");
RnApp xxx = new RnApp("XXX");
generator.getRandomNumberGenerator().subscribe(app);
generator.getRandomNumberGenerator().subscribe(xxx);

Thread.sleep(6000);

System.out.println("WAKE UP 2"); 
app.request(5);
xxx.request(5);

@smaldini What got me mixed up is that I was expecting that the subscriber would get the values from the moment that it invoked request(n). However, as you say, every value emitted between the moment of the subscription and the invocation of request(n) is buffered and delivered when the subscriber actually calls request(n). Talking from the complete ignorance, couldn't this be a problem? I mean, somewhere there's a buffer of values that won't be delivered until request(n) invocation. What if it is never called?

from reactor-core.

smaldini avatar smaldini commented on May 22, 2024

@codependent I think that's the same problem hit by operators such as cache etc and the classic solution is usually to attach a simple onBackpressureDrop() right after. The emitter is bounded so if you onNext to it directly, first you need to know what you're doing vs using connectEmitter() and the emit methods. Then the onNext will spin-wait if the buffer is full until one room frees up downstream. It's still better than uncontroller buffer grow.

Now you want to deal with individual "suspect" behavior from a subscriber perspective not the processor itself, e.g. : timeout(), onBackpressureDrop... You can even wrap the chain as single Processor if you need it :

FluxProcessor<X, X> droppingProcessor = FluxProcessor.wrap(emitterProcessor, emitterProcessor.onBackpressureDrop());

Or just return generator.onBackpressureDrop() as a consuming-side Flux.

from reactor-core.

codependent avatar codependent commented on May 22, 2024

@smaldini That's it, with the additional FluxProcessor the subscribers won't get buffered values,

Thank you everyone for looking into this.

Final code:

     @Test
     public void testPublishSubscribe() throws InterruptedException {

        Scheduler concurrent = Computations.concurrent();

        EmitterProcessor<Float> timeGenerator = EmitterProcessor.create();
        timeGenerator
            .subscribeOn(concurrent)
            .publishOn(concurrent);

        SignalEmitter<Float> emitter = timeGenerator.connectEmitter();

        Computations.single().schedule(() -> {
            SecureRandom sr = new SecureRandom();
            while (true) {
                try {
                    Thread.sleep(1000);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                float random = sr.nextFloat();
                System.out.println("NEW VALUE ---------------"+random);
                emitter.emit(random);
            }
        });

        FluxProcessor<Float,Float> droppingProcessor = FluxProcessor.<Float,Float>wrap(timeGenerator, timeGenerator.onBackpressureDrop());

        Thread.sleep(4000);

        System.out.println("WAKE UP");

        RnApp aaa = new RnApp("AAA");
        RnApp zzz = new RnApp("ZZZ");
        droppingProcessor.subscribe(aaa);
        droppingProcessor.subscribe(zzz);

        Thread.sleep(4000);

        System.out.println("REQUESTING 5");
        aaa.request(5);
        zzz.request(5);

And the log sequence:

NEW VALUE ---------------0.7953491
NEW VALUE ---------------0.09977782
NEW VALUE ---------------0.15835232
WAKE UP
NEW VALUE ---------------0.28263623
NEW VALUE ---------------0.513252
NEW VALUE ---------------0.8830035
NEW VALUE ---------------0.31092662
REQUESTING 5
NEW VALUE ---------------0.42508847
single-1-AAA got ------> 0.42508847
single-1-ZZZ got ------> 0.42508847

from reactor-core.

smaldini avatar smaldini commented on May 22, 2024

@codependent if you intent to drop I advise the new DirectProcessor, which internally is simpler and buffer-less but fails if downstream demand != Long.Max. Because you use an onBackpressureXxx operator right after we know this is only going to be the case.

from reactor-core.

codependent avatar codependent commented on May 22, 2024

@smaldini Since the requests won't happen immediately it fails and doesn't recover from it. In the end the subscribers never get anything.

Maybe I'm not using it right:

     @Test
     public void testPublishSubscribe() throws InterruptedException {

        Scheduler concurrent = Computations.concurrent();

        DirectProcessor<Float> timeGenerator = DirectProcessor.create();
        timeGenerator
            .subscribeOn(concurrent)
            .publishOn(concurrent);

        SignalEmitter<Float> emitter = timeGenerator.connectEmitter();

        Computations.single().schedule(() -> {
            SecureRandom sr = new SecureRandom();
            while (true) {
                try {
                    Thread.sleep(1000);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                float random = sr.nextFloat();
                System.out.println("NEW VALUE ---------------"+random);
                emitter.emit(random);
            }
        });

        Thread.sleep(4000);

        System.out.println("WAKE UP");

        RnApp aaa = new RnApp("AAA");
        RnApp zzz = new RnApp("ZZZ");
        timeGenerator.subscribe(aaa);
        timeGenerator.subscribe(zzz);

        Thread.sleep(4000);

        System.out.println("REQUESTING 5");
        aaa.request(5);
        zzz.request(5);

        Thread.sleep(4000);

After the faifure the subscribers request five values but don't get anything:

NEW VALUE ---------------0.6337121
NEW VALUE ---------------0.998461
NEW VALUE ---------------0.6341994
WAKE UP
NEW VALUE ---------------0.2999627
java.lang.IllegalStateException: Can't deliver value due to lack of requests
    at reactor.core.publisher.DirectProcessor$DirectProcessorSubscription.onNext(DirectProcessor.java:314)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:100)
    at reactor.core.subscriber.SubmissionEmitter.emit(SubmissionEmitter.java:150)
    at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:258)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:918)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:882)
    at reactor.core.publisher.TopicProcessor$TopicSubscriberLoop.run(TopicProcessor.java:877)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
java.lang.IllegalStateException: Can't deliver value due to lack of requests
    at reactor.core.publisher.DirectProcessor$DirectProcessorSubscription.onNext(DirectProcessor.java:314)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:100)
    at reactor.core.subscriber.SubmissionEmitter.emit(SubmissionEmitter.java:150)
    at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:258)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:918)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:882)
    at reactor.core.publisher.TopicProcessor$TopicSubscriberLoop.run(TopicProcessor.java:877)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
NEW VALUE ---------------0.961773
NEW VALUE ---------------0.08782846
NEW VALUE ---------------0.22994196
REQUESTING 5
NEW VALUE ---------------0.3262939
NEW VALUE ---------------0.14760607
NEW VALUE ---------------0.6409442
NEW VALUE ---------------0.97169876

from reactor-core.

smaldini avatar smaldini commented on May 22, 2024

You would still need onBackpressureDrop as with the other Processor, that forces request Long.Max on subscribe.

from reactor-core.

codependent avatar codependent commented on May 22, 2024

Got it, changed from
EmitterProcessor<Float> timeGenerator = EmitterProcessor.create();
to
DirectProcessor<Float> timeGenerator = DirectProcessor.create();

keeping the dropping processor:
FluxProcessor<Float,Float> droppingProcessor = FluxProcessor.<Float,Float>wrap(timeGenerator, timeGenerator.onBackpressureDrop());

Works like a charm :-)

from reactor-core.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.