Comments (17)
@NiteshKant I agree with @drewhk, onNext
is a signal, and it returning does not imply that any work has been performed, or even that the thing passed into onNext
having reached its destination. The RS spec is very clear on this: https://github.com/reactive-streams/reactive-streams-jvm#asynchronous-vs-synchronous-processing
from rsocket-java.
Thanks all for the feedback.
Here is my take (plan) on this:
- Modify the current codebase to do an explicit copy before
onNext()
. - At a later point in time, think about how to eliminate copying if there is a usecase to do that.
- Stick to copying being the default mode. Not copying would always be an opt-in.
Does the above approach sound reasonable to everyone here?
\cc @stevegury @robertroeser @tmontgomery
from rsocket-java.
ReactiveSocket uses pooled frames and TCP transport uses Netty which uses reference counted buffers. Since there isn't an indication of when a consumer is done with a buffer/payload/frame, there isn't a convenient point at which we can release the pooled objects back to the pool.
I fully understand this.
Auto-Release after callback completes.
This is unfortunately breaking the RS contract. If you want to expose such interface then it cannot be a Reactive Streams Publisher but something else. I personally don't think this is a good idea though. Remember that the whole point of RS is asynchronous boundaries, otherwise there wouldn't be a need for the request()
signal at all!
Provide an explicit way to dispose a pooled object.
This is the most flexible, but a rather dangerous option. The danger is if the onNext() call dispatches a signal to a queue, and the consuming party dies because of a bug, or other reasons, then the signals (containing the Payload) are GCd, but that will not take care of cleaning up the resource allocated by Netty.
Copy the buffer before handing off to the eventual consumer.
This is the safest bet when it comes to passing asynchronous boundaries.
from rsocket-java.
Maybe the Publisher expects that side-effects on the payload already happened after onNext() returns?
from rsocket-java.
@2m Lifetime of Payload
emitted by ReactiveSocket
is limited to the onNext()
callback. So, if the returned object is accessed out of the onNext()
scope, it will not be available (as suggested by the IllegalReferenceCountException
). Is that something runWith(Sink.head)
try to do?
PS: Sorry I am not at all familiar with scala or akka.
from rsocket-java.
This is then a violation of RS, because onNext() is an asynchronous signal. It might be synchronous for optimization purposes, but this is not guaranteed. If you assume that all processing has been finished after the return of onNext() you basically excluded all downstream asynchronous boundaries which might pass a Payload. When Akka Streams interfaces with other RS implementations, it dispatches all these signals asynchronously, mainly to prevent messing with actor internal state from an unknown, external callstack/thread.
from rsocket-java.
Here is more context on why:
ReactiveSocket
uses pooled frames and TCP transport uses Netty which uses reference counted buffers. Since there isn't an indication of when a consumer is done with a buffer/payload/frame, there isn't a convenient point at which we can release the pooled objects back to the pool. There are a few options:
- Auto-Release after callback completes.
- Provide an explicit way to dispose a pooled object.
- Copy the buffer before handing off to the eventual consumer.
Currently ReactiveSocket
takes the first approach as it is flexible to give a consumer the option to copy data on an asynchronous boundary. However, it has the disadvantage as you noticed.
Which approach of the above (or otherwise) do you guys think is better?
from rsocket-java.
You can copy to allow for async signals, but you can't uncopy if its done for you. So it seems like there must be a mode that doesn't involve copies.
Would it work to have a flag on any Netty transports that flips between
- auto release
- require dispose
Ideally using Netty ref counting would be nice e.g. the flag just controls whether to increment the ref count before handing off.
my 2c
from rsocket-java.
You can copy to allow for async signals, but you can't uncopy if its done for you. So it seems like there must be a mode that doesn't involve copies.
I get what you say, and I am not against having a synchronous mode, all I am saying that an RS Publisher is not suitable for this mode. Relying that an RS Subscriber does all its work on the onNext() call is not safe within the RS spec (the whole point of the spec is to regulate rate across asynchronous boundaries without blocking). If you still mandate this mode for RS Subscribers using this project, then you exclude a whole set of RS compliant libraries and undermine the standard forcing a subset of it on implementors. Since the whole idea was interoperability between libraries, this is not the way to go.
from rsocket-java.
I agree that for APIs exposing a ReactiveStreams Publisher
, the data must be copied so the contract is met. For clients that wish to avoid the copy, a different interface could be exposed that clearly documents what the consumer must do.
from rsocket-java.
Would it work to have a flag on any Netty transports that flips between
@yschimke scope of the frame is actually an artifact of frame pooling. TCP just uses the semantics to make sure it releases the buffer associated with the frame. So, the flag or a different API as @benjchristensen suggests would be associated with the frame of ReactiveSocket
.
So it seems like there must be a mode that doesn't involve copies.
Agreed.
from rsocket-java.
How is this different then onNext-ing a raw ByteBuf? It's not, right? Why not just add a release method on the Frame/Payload?
from rsocket-java.
Does the above approach sound reasonable to everyone here?
It sounds good. I think there is value though in allowing to pass in a 1-to-1 mapper function when you request the Publisher. The use case is that the very first step in streams will be very likely deserialization. If we do a copy of the buffer, just to be almost immediately discarded after deserialization it would be rather wasteful. If there is a facility that allows mapping the buffer to a type T and then immediately reclaims the buffer then we would reduce copying for all those cases that turn the Payload into a domain object as the first step. Of course it should be documented that the mapper function should return an object that does not retain the Payload inside.
How is this different then onNext-ing a raw ByteBuf? It's not, right?
There is no difference.
Why not just add a release method on the Frame/Payload?
The issue here is that these might end up in an element buffer, unconsumed and hence unreleased. Typical scenario is that you hand off N buffers through onNext, from which some number M is consumed and released, then the downstream fails and cancels its subscription. Now N-M byte buffers are not reclaimed (a generic, RS compliant downstream likely treats it as a buffer or elements of some type T and hence has no idea of how to release them. Also, the thread running it might be dead anyway because of a more fatal bug).
from rsocket-java.
The issue here is that these might end up in an element buffer, unconsumed and hence unreleased
I don't see an issue - I see descriptions of not using ByteBuf correctly. If you put a bunch of ByteBufs in a queue, and don't release, that isn't correct, right? Netty seems fairly popular and people call release all the time without issue. No reason to get rid of zero copy if we add a release method to the Payload interface.
from rsocket-java.
I don't see an issue - I see descriptions of not using ByteBuf correctly. If you put a bunch of ByteBufs in a queue, and don't release, that isn't correct, right?
Of course if you hand code a Reactive Streams Subscriber that only consumes ByteBuf elements then it can manage the queue as it wants. What I explicitly referred to though are RS Subscribers that are generic, i.e. parametrized over some type T. Those have no idea about the nature of the elements, and I assume many usages of the Publisher that is exposed here would be calling some form of map
combinator, which is usually parametric in type.
This is why I proposed the possible injection of a mapper function before the exposed Publisher, so for example it is possible to do deserialization without copying the buffer, and the end result is usually a GC-able domain object.
Netty seems fairly popular and people call release all the time without issue.
Yes, but most of the Netty pipelines in practice
- do the processing synchronously (i.e. no buffer to worry about, method calls either return, fail with exception or end up in an infinite loop)
- aware of bytebuffers and much rarely generalized to some arbitrary type T
We are exposing here a Publisher though which is more general than the pipelines above.
from rsocket-java.
Wasn't this addressed in #209 ?
from rsocket-java.
This addressed this
#267
from rsocket-java.
Related Issues (20)
- Ability to store session object inside RSocketRequester chain HOT 1
- About JWT authentication and authorization HOT 1
- How to set SETUP setupRoute
- SendUtils onDiscard handler class cast exception HOT 5
- https://maven.pkg.github.com/rsocket/rsocket-java (needs permission to access) HOT 1
- Tcp Example Server and Client in Seperate files doesn't work HOT 5
- NoSuchFieldError: rsocket-core overrides dependency for netty-buffer and therefore clashes with reactor-bom / reactor-netty HOT 2
- ClosedChannelException making RSocket request with invalid credentials HOT 1
- Bump reactor-bom to 2020.0.24 to fix CVE in reactor-netty HOT 2
- 1.1.4 Regression - RejectedSetupException on auth failure results in ClosedChannelException HOT 3
- Releasing memory in ZERO_COPY mode.
- Ongoing flux subscription is not getting cancelled even if the client is closed
- LoadbalanceRSocketClient lacks reconnection and retry functions HOT 1
- Send a message to an already established request-stream connection from the rsocket-server to all signed rscoket-clients except one
- How to keep RSocket connection alive forever. What should be configuration values for keepalive(interval, maxLifeTime) to achieve this? HOT 1
- Update "Supported Core Protocol Features Matrix" to add that Resumption is supported
- Server does not close and re-connect on no keep alive ack issue.
- Rsocket error "java.lang.IllegalStateException: Source has to be ASYNC fuseable" with Spring boot 3.2.x & JDK17 HOT 2
- RSocketRequesterTracingObservationHandler producing netty buffer LEAKs
- throw exception on errorConsumer, client will not finish
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rsocket-java.