Comments (4)
Hi, @RonBarkan 👋
Please consider the following:
@Test
void lateSuscribe() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ConnectableFlux<Integer> publish = Flux.just(1)
.publish();
publish.subscribe(
r -> System.out.println("1.Next: " + r),
e -> System.out.println("1.Error " + e),
() -> System.out.println("1.Done"));
publish.connect();
publish.doFinally(s -> latch.countDown())
.subscribe(r -> System.out.println("2.Next: " + r),
e -> System.out.println("2.Error " + e),
() -> System.out.println("2.Done"));
latch.await();
}
And the output:
1.Next: 1
1.Done
It also does not finish. That is because, in my view, late-arriving Subscriber
has no defined contract in case of publish()
.
Please consider using replay()
to achieve the desired result. Please consult our reference documentation for some examples.
With the above, I'm closing this issue. For further questions around such behaviour, StackOverflow might be a good place. For instance, I found a similar question with a reasonable answer: https://stackoverflow.com/questions/59237587/reactor-publish-behavior
from reactor-core.
I'd expect the 2nd subscriber to get the complete signal. If the published flux had elements, the late subscriber should not see them and only get the complete signal.
This is what I understand from the marble diagram in the JavaDoc of Flux.publish(). The link you provided does not say anything about late subscribers, AFAICT. Consider updating the docs.
Lastly, the stackoverflow answer does exactly what I described above, and does not discuss hanging.
In any case, it seems a very weird desired behavior to have pipelines hang for this possibly subtle issue. Seems to me that even throwing an exception will be preferred. In particular, the published flux accepted the subscription and then ignored it. It is certainly a surprise to see your pipeline hang with the right set of inputs and it should be a design goal to reduce API usage surprises.
from reactor-core.
(Regarding replay()
thank you for your suggestion. Even though the original pipeline had a significant flaw, not related to this issue, both publish()
and replay() / cache()
are not appropriate. The real code is being change to do something else.)
from reactor-core.
Thanks for providing more feedback @RonBarkan. I did a bit of digging in the history and found #2897 - it feels it's the same subject, so I marked this as duplicate. It also contains the suggestion of using replay(history=0)
to overcome the limitations of publish
. After giving it a second thought, it does feel unexpected to have a hanging process and coordinating anything following a completion to avoid the late subscription feels undesired. I can't tell much about the priorities and timeline around this, as there are other ongoing efforts, but a contribution is always welcome. At the same time, having a workaround takes away the pressure I suppose.
from reactor-core.
Related Issues (20)
- Mono.share() allow a stream to be canceled HOT 5
- Flaky test - FluxBlackboxProcessorVerification HOT 6
- Flaky test - DefaultTestSubscriberTest HOT 5
- context lost when using Mono.create with threads HOT 2
- [test] Verify Initialization of Default Labels
- Too difficult to control how much Reactor buffers internally HOT 2
- Enabled Automatic Context Propagation and context propagation with lift causes ClassCastException HOT 10
- [Flaky test] FluxCreateTest.fluxCreateOnRequestMultipleThreadsSlowProducer
- BoundedElasticThreadPerTaskSchedulerTest > ensuresTasksScheduling() FAILED HOT 4
- SinksTest > OptimisticEmitFailureHandlerTest > shouldRetryOptimistically() FAILED
- Add bufferWeightedWithin operator.
- thenMany does not ignore all emissions of a concat due to incorrect optimization HOT 2
- FluxBufferWhenTest > timedOutBuffersDontLeak() FAILED
- Support Considering Individual Element Weight in Determining Buffer Boundary instead of Element Counts
- Javadoc for some versions is missing from the website HOT 3
- Flux.mergeSequential does not subscribe to last Producer in specific circumstances HOT 1
- Failing while building reactor-core version 3.4.18 using ./gradlew build (io.projectreactor:reactor-core:3.4.18) HOT 4
- Fatal exceptions not caught in onErrorDropped Hook HOT 2
- autoConnect(0) seems to be broken - late subscribers receive data
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from reactor-core.