Coder Social home page Coder Social logo

akka-contrib-extra's Introduction

akka-contrib-extra

Build Status

Welcome to akka-contrib-extra!

This project contains various utilities that are useful when working with Akka.

This repository is not actively maintained, and intended to be retired. Useful pieces should be either proposed to be included in the main Akka, Alpakka or Akka HTTP projects, or migrated to the personal accounts of contributors willing to maintain the library. Feel free to migrate elements to other repositories (under the terms of the Apache license) or submit motivated proposals to include elements in the main Akka projects.

© Typesafe Inc., 2014

akka-contrib-extra's People

Contributors

2m avatar fsat avatar hseeberger avatar huntc avatar jeantil avatar ktoso avatar longshorej avatar markusjura avatar patriknw avatar raboof avatar rkuhn avatar sethtisue avatar viktorklang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-contrib-extra's Issues

Default for maxRetries in ReconnectingStreamHttp.outgoingConnection must be Int.MaxValue

scala> Timeout(1.second * Int.MaxValue)
res1: akka.util.Timeout = Timeout(2147483647 seconds)

scala> Timeout(1.second * Long.MaxValue)
java.lang.IllegalArgumentException: requirement failed: Duration is limited to +-(2^63-1)ns (ca. 292 years)
  at scala.concurrent.duration.FiniteDuration.<init>(Duration.scala:568)
  at scala.concurrent.duration.FiniteDuration.$times(Duration.scala:668)
  ... 43 elided

InputStreamPublisher blocks such that it may not be shutdown

InputStreamPublisher blocks its actor thread when reading from a stream. This causes a problem when the actor system is shutdown as the publisher will never receive its stop message.

The following JVM dump illustrates the thread being blocked. The dump was taken once system.shutdown() and then system.awaitTermination() had been called.

"conductr-akka.actor.default-dispatcher-15" #24 prio=5 os_prio=31 tid=0x00007fddb1af8000 nid=0x6503 runnable [0x0000000115cb1000]
   java.lang.Thread.State: RUNNABLE
    at java.io.FileInputStream.readBytes(Native Method)
    at java.io.FileInputStream.read(FileInputStream.java:234)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    - locked <0x00000007b84491b8> (a java.lang.UNIXProcess$ProcessPipeInputStream)
    at java.io.FilterInputStream.read(FilterInputStream.java:107)
    at akka.contrib.stream.InputStreamPublisher$$anonfun$1.apply$mcI$sp(InputStreamPublisher.scala:66)
    at akka.contrib.stream.InputStreamPublisher$$anonfun$1.apply(InputStreamPublisher.scala:66)
    at akka.contrib.stream.InputStreamPublisher$$anonfun$1.apply(InputStreamPublisher.scala:66)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
    at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
    at scala.concurrent.package$.blocking(package.scala:123)
    at akka.contrib.stream.InputStreamPublisher.read(InputStreamPublisher.scala:66)

test failure (DirectivesSpec) in Scala 2.12 community build

seen at e.g. https://scala-ci.typesafe.com/job/scala-2.12.x-integrate-community-build/2557/consoleFull :

[akka-contrib-extra] [info] DirectivesSpec:
[akka-contrib-extra] [info] accept directive
[akka-contrib-extra] [info] - should pass to inner route if a matching accept header is present
[akka-contrib-extra] [info] - should reject if no requested accept header is present
[akka-contrib-extra] [info] parameterList directive
[akka-contrib-extra] [info] - should provide a list of parameters to the inner route *** FAILED ***
[akka-contrib-extra] [info]   "List([1, 2, 3])" was not equal to "List([3, 2, 1])" (DirectivesSpec.scala:41)

in the context of the community build, I'll just disable running the tests. we had only added akka-contrib-extra to the community build in the first place because it was a dependency of conductr-lib

maintainers of this repo: feel free to pursue or ignore as you see fit

Processes spawned by NonBlockingProcess inherit file descriptors

On Linux, processes spawned by NonBlockingProcess inherit the JVM's open file descriptors. The underlying cause it due to this NuProcess issue.

The following can be used as a work-around -- spawn bash to run these commands, and then exec the command you wished to spawn:

if [ -d /proc/$$/fd/ ]; then
  for descriptor_path in /proc/$$/fd/*; do
      descriptor="$(basename "$descriptor_path")"
      # Don't close stdin/stderr/stdout (-gt 2)
      if [ $descriptor -gt 2 ]; then
        exec {descriptor}<&-
      fi
  done
fi

exec command arg1 arg2 ...

NonBlockingProcess stdio materializer shutdown causing test timeouts

I've been struggling with some flaky tests that rely on reading stdout returned from a NonBlockingProcess actor. Essentially, my tests sometimes time out waiting for the output, even though I can see it definitely makes it as far as the nuprocess onStdout handler function.

Indeed, I have actually seen NonBlockingProcessSpec occasionally fail with a similar looking error. In fact the CI build for the PR I contributed recently happened to fail with it :

[info] NonBlockingProcessSpec:
[info] A NonBlockingProcess
[info] - should read from stdin and write to stdout *** FAILED ***
[info]   java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg:
[info]   at scala.Predef$.assert(Predef.scala:170)
[info]   at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:404)
[info]   at akka.testkit.TestKit.expectMsgPF(TestKit.scala:828)
[info]   at akka.contrib.process.NonBlockingProcessSpec$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(NonBlockingProcessSpec.scala:40)
[info]   at akka.contrib.process.NonBlockingProcessSpec$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(NonBlockingProcessSpec.scala:29)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
[info]   ...
[info] - should allow a blocking process that is blocked to be destroyed
[info] - should allow a blocking process that is blocked to be stopped
[info] - should be able to create the reference.conf specified limit of processes
[info] - should detect when a process has exited while having orphaned children that live on
[info] - should close stdin when input stream terminates

Now, my theory is that the stdioMaterializer is being shutdown before the client (parent of the NonBlockingProcess actor) can consume the data. As although there is logic to only shutdown when nuprocess has deemed the stream closed, there is still no guarantee that the client has fully consumed the stdout/stderr Source's.

I admit I don't fully understand what effect shutting down the materializer has on the SourceQueue/Source returned by materializing Source.queue - but the docs suggest it shuts down all underlying actors and abruptly stops queue processing, so it sounds like a potential issue.

I have already written a patch that replaces stdioMaterializer with a implicit parameter on the actor, meaning it is the clients responsibility to create the materializer (and handle its life cycle accordingly) - happy to submit a PR if you are happy, it also cleans up the code nicely.

Blocking IO dispatcher is misconfigured for its max values

We need to set the following for the blocking io dispatcher thread to kick in its set of max pool size values. If we don't then we end up supporting only about 15 processes per agent in ConductR as the default queue size of -1 will prevent the max values from being used.

akka.process.blocking-process.blocking-io-dispatcher.thread-pool-executor.task-queue-size=1

Change reconnect API

  • outgoingConnection should be renamed bindAndHandle
  • Http.reconnect is too much magic => instead renamve ReconnectingStreamHttp to ReconnectingHttp

/cc @ktoso

NonBlockingProcess not killing child processes on postStop

I don't have a test case to confirm this yet but this is what I observe:

to reproduce:
I run an infinite computation
I throw an exception to trigger the actor restart (default supervision strategy)
the postStop is called and kill the process

observed:
the process is still running despite the process.destroy(true)

my hypothesis is that we need to wait for the process to be fully stopped (by blocking in the postStop with a latch for example).

I'm still investigating, I will let you know if I find anything.

Implement helper classes for Akka Distributed Data and Actor stacking

Implement Akka Distributed Data helper class:

  • ORMapState which provides helper methods dealing with ORMap
  • ReplicatingActor which provides base trait for Actor which provides reference of Akka Distributed Data's replicator

Implement EmptyActor abstract class - provides empty behaviour which is helpful when applying stackable traits.

Remove reconnection facilities

They've proved to be "not enough", and more fancy solutions had to be implemented in projects.
In order to avoid people depending on these we should remove them.

The functionality is (a core building block for it) will be implemented in akka-streams directly.
See these tickets to track progress: akka/akka#16985 akka/akka#16882

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.