Coder Social home page Coder Social logo

Comments (9)

hochgi avatar hochgi commented on August 21, 2024

@raboof do you know which change broke it?
Any chance reference.conf settings were overridden?
https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/resources/reference.conf

If configured time isn't enough, stage will fail with an IllegalStateException instead of the expected exception.
(test took 23ms, so config/killSwitch change is the only guesses I have right now as to why it failed...)

P.S.
We're running with unfoldFlow in production for several months now:
https://github.com/thomsonreuters/CM-Well
and no failures in it so far...

Will try to see if I can reproduce it later.

from akka-stream-contrib.

hochgi avatar hochgi commented on August 21, 2024

Well, only reproducible in 2.5(.3), and not deterministically.
Fail on both on & off fusing (which doesn't matter anyway, since 2.5.X has fusing always enabled, and setting is deprecated).
When all other tests ignored, passes for both on & off.

from akka-stream-contrib.

hochgi avatar hochgi commented on August 21, 2024

Added some debug prints, and got the following findings:
test code:

val (killSwitch, sink) = source.toMat(TestSink.probe)(Keep.both).run()
val kill = new Exception("KILL!")
sink.ensureSubscription()
killSwitch.abort(kill)
sink.request(1)
sink.expectError(kill)

expectations:

killSwitch invoked, causing a race condition between upstream & downstream cancellations:

scenario 1:
killSwitch invoked -> flow propagates termination-> InHandler.onUpstreamFailure is invoked and stream terminates properly.
scenario 2: 
killSwitch invoked -> feedback's OutHandler.onDownstreamFinish -> set scheduler to configured 1 minute to allow scenario 1 to finish properly

in any case, stream should have been canceled.
what actually happened was:

1- killSwitch invoked
2- test request(1) -> output's onPull -> feedback's onPull -> onPush -> push
3- feedback's OutHandler.onDownstreamFinish is invoked
4- InHandler's onUpstreamFailure is invoked

The problem is that killSwitch is still operational after being aborted.

This seems like a bug in KillSwitch!

After being aborted, it shouldn't have handled both onPull & onPush.

from akka-stream-contrib.

johanandren avatar johanandren commented on August 21, 2024

abort() is not synchronous, so you cannot be sure the command to abort passes the asynchronous boundary before the request and emit happens. That's a race that is hard to workaround I'm afraid. Only thing I can think of making it less racy is the sleep before requesting.

from akka-stream-contrib.

hochgi avatar hochgi commented on August 21, 2024

@johanandren so is this test even needed? perhaps we should just drop the test entirely...

from akka-stream-contrib.

johanandren avatar johanandren commented on August 21, 2024

Sounds like should fail instantly when aborted should cover that it works, but I didn't look into the code.

I'm not quite sure why there needs to be special tests for killswitch here, it doesn't do anything a cancelled/failed/completed stream does not already do.

from akka-stream-contrib.

hochgi avatar hochgi commented on August 21, 2024

Since the unfold source closes over a given unknown flow, we need to take careful care of flow termination.
We closes over the given flow from both upstream and downstream.
So, if we get onDownstreamFinish() being called, we can't (yet) know how to terminate.
So we wait for a configurable amount of time, and wait for either onUpstreamFinish() or onUpstreamFailure(ex).
If configured time exceeded, we terminate with fail(IllegalStateException).

Because of that, there's difference between KillSwitch being appended or prepended to flow. (maybe a test holding downstream fail explicitly should be added).

Also, the reason this test was added, was to check the following scenario:

  1. Inner flow send termination as downstream (onDownstreamFinish() is called)
  2. We wait (1m by default) to get flow termination as onUpstreamFinish() or onUpstreamFailure(ex)
  3. Source is being pulled
  4. We get onUpstreamFailure(ex)

It's a delicate edge case, and IIRC, I had a bug, pull changed the state in a way such that flow didn't terminate.

from akka-stream-contrib.

johanandren avatar johanandren commented on August 21, 2024

Can you perhaps put two probes in there instead, by using Flow.fromSinkAndSource and have full control over what goes on in the inner flow?

from akka-stream-contrib.

hochgi avatar hochgi commented on August 21, 2024

Good idea. Will try to get to it soon and open a quick PR.

from akka-stream-contrib.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.