Coder Social home page Coder Social logo

Comments (17)

viktorklang avatar viktorklang commented on May 22, 2024 2

@NiteshKant I agree with @drewhk, onNext is a signal, and it returning does not imply that any work has been performed, or even that the thing passed into onNext having reached its destination. The RS spec is very clear on this: https://github.com/reactive-streams/reactive-streams-jvm#asynchronous-vs-synchronous-processing

from rsocket-java.

NiteshKant avatar NiteshKant commented on May 22, 2024 2

Thanks all for the feedback.

Here is my take (plan) on this:

  • Modify the current codebase to do an explicit copy before onNext().
  • At a later point in time, think about how to eliminate copying if there is a usecase to do that.
  • Stick to copying being the default mode. Not copying would always be an opt-in.

Does the above approach sound reasonable to everyone here?

\cc @stevegury @robertroeser @tmontgomery

from rsocket-java.

drewhk avatar drewhk commented on May 22, 2024 1

ReactiveSocket uses pooled frames and TCP transport uses Netty which uses reference counted buffers. Since there isn't an indication of when a consumer is done with a buffer/payload/frame, there isn't a convenient point at which we can release the pooled objects back to the pool.

I fully understand this.

Auto-Release after callback completes.

This is unfortunately breaking the RS contract. If you want to expose such interface then it cannot be a Reactive Streams Publisher but something else. I personally don't think this is a good idea though. Remember that the whole point of RS is asynchronous boundaries, otherwise there wouldn't be a need for the request() signal at all!

Provide an explicit way to dispose a pooled object.

This is the most flexible, but a rather dangerous option. The danger is if the onNext() call dispatches a signal to a queue, and the consuming party dies because of a bug, or other reasons, then the signals (containing the Payload) are GCd, but that will not take care of cleaning up the resource allocated by Netty.

Copy the buffer before handing off to the eventual consumer.

This is the safest bet when it comes to passing asynchronous boundaries.

from rsocket-java.

drewhk avatar drewhk commented on May 22, 2024

Maybe the Publisher expects that side-effects on the payload already happened after onNext() returns?

from rsocket-java.

NiteshKant avatar NiteshKant commented on May 22, 2024

@2m Lifetime of Payload emitted by ReactiveSocket is limited to the onNext() callback. So, if the returned object is accessed out of the onNext() scope, it will not be available (as suggested by the IllegalReferenceCountException). Is that something runWith(Sink.head) try to do?

PS: Sorry I am not at all familiar with scala or akka.

from rsocket-java.

drewhk avatar drewhk commented on May 22, 2024

This is then a violation of RS, because onNext() is an asynchronous signal. It might be synchronous for optimization purposes, but this is not guaranteed. If you assume that all processing has been finished after the return of onNext() you basically excluded all downstream asynchronous boundaries which might pass a Payload. When Akka Streams interfaces with other RS implementations, it dispatches all these signals asynchronously, mainly to prevent messing with actor internal state from an unknown, external callstack/thread.

from rsocket-java.

NiteshKant avatar NiteshKant commented on May 22, 2024

Here is more context on why:

ReactiveSocket uses pooled frames and TCP transport uses Netty which uses reference counted buffers. Since there isn't an indication of when a consumer is done with a buffer/payload/frame, there isn't a convenient point at which we can release the pooled objects back to the pool. There are a few options:

  • Auto-Release after callback completes.
  • Provide an explicit way to dispose a pooled object.
  • Copy the buffer before handing off to the eventual consumer.

Currently ReactiveSocket takes the first approach as it is flexible to give a consumer the option to copy data on an asynchronous boundary. However, it has the disadvantage as you noticed.

Which approach of the above (or otherwise) do you guys think is better?

from rsocket-java.

yschimke avatar yschimke commented on May 22, 2024

You can copy to allow for async signals, but you can't uncopy if its done for you. So it seems like there must be a mode that doesn't involve copies.

Would it work to have a flag on any Netty transports that flips between

  • auto release
  • require dispose

Ideally using Netty ref counting would be nice e.g. the flag just controls whether to increment the ref count before handing off.

my 2c

from rsocket-java.

drewhk avatar drewhk commented on May 22, 2024

You can copy to allow for async signals, but you can't uncopy if its done for you. So it seems like there must be a mode that doesn't involve copies.

I get what you say, and I am not against having a synchronous mode, all I am saying that an RS Publisher is not suitable for this mode. Relying that an RS Subscriber does all its work on the onNext() call is not safe within the RS spec (the whole point of the spec is to regulate rate across asynchronous boundaries without blocking). If you still mandate this mode for RS Subscribers using this project, then you exclude a whole set of RS compliant libraries and undermine the standard forcing a subset of it on implementors. Since the whole idea was interoperability between libraries, this is not the way to go.

from rsocket-java.

benjchristensen avatar benjchristensen commented on May 22, 2024

I agree that for APIs exposing a ReactiveStreams Publisher, the data must be copied so the contract is met. For clients that wish to avoid the copy, a different interface could be exposed that clearly documents what the consumer must do.

from rsocket-java.

NiteshKant avatar NiteshKant commented on May 22, 2024

Would it work to have a flag on any Netty transports that flips between

@yschimke scope of the frame is actually an artifact of frame pooling. TCP just uses the semantics to make sure it releases the buffer associated with the frame. So, the flag or a different API as @benjchristensen suggests would be associated with the frame of ReactiveSocket.

So it seems like there must be a mode that doesn't involve copies.

Agreed.

from rsocket-java.

robertroeser avatar robertroeser commented on May 22, 2024

How is this different then onNext-ing a raw ByteBuf? It's not, right? Why not just add a release method on the Frame/Payload?

from rsocket-java.

drewhk avatar drewhk commented on May 22, 2024

Does the above approach sound reasonable to everyone here?

It sounds good. I think there is value though in allowing to pass in a 1-to-1 mapper function when you request the Publisher. The use case is that the very first step in streams will be very likely deserialization. If we do a copy of the buffer, just to be almost immediately discarded after deserialization it would be rather wasteful. If there is a facility that allows mapping the buffer to a type T and then immediately reclaims the buffer then we would reduce copying for all those cases that turn the Payload into a domain object as the first step. Of course it should be documented that the mapper function should return an object that does not retain the Payload inside.

How is this different then onNext-ing a raw ByteBuf? It's not, right?

There is no difference.

Why not just add a release method on the Frame/Payload?

The issue here is that these might end up in an element buffer, unconsumed and hence unreleased. Typical scenario is that you hand off N buffers through onNext, from which some number M is consumed and released, then the downstream fails and cancels its subscription. Now N-M byte buffers are not reclaimed (a generic, RS compliant downstream likely treats it as a buffer or elements of some type T and hence has no idea of how to release them. Also, the thread running it might be dead anyway because of a more fatal bug).

from rsocket-java.

robertroeser avatar robertroeser commented on May 22, 2024

@drewhk

The issue here is that these might end up in an element buffer, unconsumed and hence unreleased

I don't see an issue - I see descriptions of not using ByteBuf correctly. If you put a bunch of ByteBufs in a queue, and don't release, that isn't correct, right? Netty seems fairly popular and people call release all the time without issue. No reason to get rid of zero copy if we add a release method to the Payload interface.

from rsocket-java.

drewhk avatar drewhk commented on May 22, 2024

@robertroeser

I don't see an issue - I see descriptions of not using ByteBuf correctly. If you put a bunch of ByteBufs in a queue, and don't release, that isn't correct, right?

Of course if you hand code a Reactive Streams Subscriber that only consumes ByteBuf elements then it can manage the queue as it wants. What I explicitly referred to though are RS Subscribers that are generic, i.e. parametrized over some type T. Those have no idea about the nature of the elements, and I assume many usages of the Publisher that is exposed here would be calling some form of map combinator, which is usually parametric in type.

This is why I proposed the possible injection of a mapper function before the exposed Publisher, so for example it is possible to do deserialization without copying the buffer, and the end result is usually a GC-able domain object.

Netty seems fairly popular and people call release all the time without issue.

Yes, but most of the Netty pipelines in practice

  • do the processing synchronously (i.e. no buffer to worry about, method calls either return, fail with exception or end up in an infinite loop)
  • aware of bytebuffers and much rarely generalized to some arbitrary type T

We are exposing here a Publisher though which is more general than the pipelines above.

from rsocket-java.

ktoso avatar ktoso commented on May 22, 2024

Wasn't this addressed in #209 ?

from rsocket-java.

robertroeser avatar robertroeser commented on May 22, 2024

This addressed this
#267

from rsocket-java.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.