Make your service more resilient by providing protection against traffic oversaturation.
Check the website for detail - iheartradio.github.io/kanaloa
Make your service more resilient by providing protection against traffic oversaturation
Home Page: http://iheartradio.github.io/kanaloa
License: Other
Make your service more resilient by providing protection against traffic oversaturation.
Check the website for detail - iheartradio.github.io/kanaloa
real time monitoring of all the work pipeline metrics
[error] x stop an underutilizationStreak
[error] 'Some(UnderUtilizationStreak(2015-12-14T16:16:45.093,3))' is not empty (AutoScalingSpec.scala:67)
The above two metrics is only reported when all workers are busy.
The tests should ensure the following:
Create an Akka.Http or play frontend serving an Http interface that talks to an tunable Akka mock backend that mimics a DB service. This mock backend should have a hidden number of optimal concurrent requests: χ, and the output should be a convex functon to the concurrent requests, where its max output is at concurrent requests number χ.
Use Gatling to test against it, tuning the backend to simulate a backend breakdown.
https://www.ietf.org/mail-archive/web/iccrg/current/pdfB57AZSheOH.pdf
Also implement both back pressure
In the "starting" state, there exists a potential case where if there is Work already queued, and the Worker receives another Work message, it will overwrite the queued value.
This ticket might be a NOP, if we decide to do away with this state entirely, but since this is such a small fix, would be super easy to sneak in.
setUp(scn.inject(
rampUsers(500) over (5 minutes) //mainly by throttle below
)).throttle(
reachRps(200) in (5.minutes),
holdFor(3.minute)
)
.protocols(httpConf)
.assertions(global.responseTime.percentile3.lessThan(5000)) //95% less than 5s
optimal-concurrency = 10 //optimal concurrent requests the backend can handle
optimal-throughput = 100 //the opitmal throughput (msg / second) the backend can handle
buffer-size = 5000
frontend-timeout = 30s
overload-punish-factor = 0 //between 1 and 0, one gives the maximum punishment while 0 gives none
kanaloa {
default-dispatcher {
workTimeout = 1m
updateInterval = 200ms
workerPool {
startingPoolSize = 20
}
autothrottle {
numOfAdjacentSizesToConsiderDuringOptimization = 18
chanceOfScalingDownWhenFull = 0.3
}
backPressure {
referenceDelay = 1s
durationOfBurstAllowed = 5s
}
metrics {
enabled = on // turn it off if you don't have a statsD server and hostname set as an env var KANALOA_STRESS_STATSD_HOST
statsd {
namespace = kanaloa-stress
host = ${?KANALOA_STRESS_STATSD_HOST}
eventSampleRate = 0.25
}
}
}
}
dropping data on the floor
a worker with constant failure or timeout should refrain from pulling work as well.
here is a paper to start with.
http://www.diva-portal.org/smash/get/diva2:706244/FULLTEXT01.pdf
right now to create a kanaloa dispatcher props you need an implicit actorSystem, not entirely sure but I think this is needed only by the metricsCollector. We should be able to change the code that metrics collector is instantiated using the actorRefFactory of the kanaloa Dispatcher. If that's the case we don't have to use an ActorSystem to create Prop which feels weird.
so that client can turn on
akka.actor.serialize-messages = on
It's been awhile (39 commits away) since 3.0
release. Can you release a new version? Thanks
When the QueueProcessor queries a Queue for its status, the retiring state is not being captured.
"work failed after 0 try(s)" doesnt make much sense. The ResultCheck catch all failure should also be user friendly enough.
which will include tests and integration tests
see project/Publishing.scala for example of managing release steps
basic idea.
Currently, in QueueProcessor, it tracks ResultHistory by way of a Vector[Boolean]
. This can be simplified a bit to a circular data structure. One simple idea is to just wrap a Array[Byte] and to keep a simple pointer at the current index, which then resets to 0 when it reaches the end of the array.
When creating a Dispatcher right now, the Dispatcher constructor returns immediately, even though behind the scenes there is still some initialization going on.
We should change this to return a Future[Dispatcher]
, and only complete the Future when all underlying Actors have been fully initialized.
In order to achieve this, we will need to spawn an intermediate Actor which create a Promise. When all underlying Actors are ready, they will need to report to this Actor, and then the Actor can fill the Promise with the Dispatcher instance.
These means that some of the existing Actors will need to report there 'ready' status to this Actor. The details can be hashed out a bit more if/when we decide to do this.
In some cases load shedding might be more useful than back pressure. We need to implement this based on proved algo.
Just tracking this:
Right now in the waitingForRoutee
state, if there is Work queued, and the Worker receives a Terminated(queue)
or a NoWorkLeft
message, it will stop itself, and not execute the Work.
This also is a non issue, if we do away with this waitingForRoutee
state.
right now it's either fail and try or fail and forget, could use some ability to do more sophisticated error handling.
As suggested by @nsauro
One idea is to provide a Backoff trait, which gets called for every timeout. An instance would need to be created per worker, and it is guaranteed to be threadsafe code, since it would be called from within the Actor's receive function. The idea would be that the user can provide their own implementation of how they want to handle timeouts. IE: exponential backoff, etc.
The trait's interface would just be
def handleTimeout(msg?) : Option[FiniteDuration]
We could easily provide a few different, basic implementations, including this algorithm here.
The Worker is a bit complex, and naturally as a result, some things get lost in the fold. In order to start thinking of ways to reduce complexity, one idea floated is to eliminate the "waitingForRoutee" state, and instead, if a Worker loses its Routee, to just kill the Worker and respawn a new one.
Scenarios
The orchestration of shutdown between QueueProcessor, Queue and Workers is a bit duplicative in terms of who is watching who.
Currently the QueueProcessor receives the initial Shutdown
message, and then it will tell the Queue to shutdown. The Queue then messages to the the Workers NoWorkLeft
, which begins their termination. However if the Queue still has Work
messages queued, it is unclear what happens to them(I think they just get dropped?). The Queue then enters the retiring state, where it waits for a RetiringTimeout
message. Once it receives this, it then sends more NoWorkLeft
messages to the Workers and then shuts itself down. This termination is then watched by both the QueueProcessor and the Workers, which they then further react to. The QueueProcessor will send Retire
signals to the Workers.
I don't think there are any real adverse effects here, other than the potential Work being dropped, which I need to verify, but this could be simplified a bit.
realtime-job-service java.net.SocketException: No buffer space available
realtime-job-service at sun.nio.ch.DatagramChannelImpl.send0(Native Method) ~[na:1.8.0_66]
realtime-job-service at sun.nio.ch.DatagramChannelImpl.sendFromNativeBuffer(DatagramChannelImpl.java:521) ~[na:1.8.0_66]
realtime-job-service at sun.nio.ch.DatagramChannelImpl.send(DatagramChannelImpl.java:498) ~[na:1.8.0_66]
realtime-job-service at sun.nio.ch.DatagramChannelImpl.send(DatagramChannelImpl.java:462) ~[na:1.8.0_66]
realtime-job-service at kanaloa.reactive.dispatcher.metrics.StatsDActor.flush(StatsDClient.scala:225) [kanaloa_2.11-0.2.1.jar:0.2.1]
realtime-job-service at kanaloa.reactive.dispatcher.metrics.StatsDActor.kanaloa$reactive$dispatcher$metrics$StatsDActor$$doSend(StatsDClient.scala:192) [kanaloa_2.11-0.2.1.jar:0.2.1]
realtime-job-service at kanaloa.reactive.dispatcher.metrics.StatsDActor$$anonfun$receive$1.applyOrElse(StatsDClient.scala:169) [kanaloa_2.11-0.2.1.jar:0.2.1]
realtime-job-service at akka.actor.Actor$class.aroundReceive(Actor.scala:480) [akka-actor_2.11-2.4.0.jar:na]
realtime-job-service at kanaloa.reactive.dispatcher.metrics.StatsDActor.aroundReceive(StatsDClient.scala:154) [kanaloa_2.11-0.2.1.jar:0.2.1]
realtime-job-service at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525) [akka-actor_2.11-2.4.0.jar:na]
realtime-job-service at akka.actor.ActorCell.invoke(ActorCell.scala:494) [akka-actor_2.11-2.4.0.jar:na]
realtime-job-service at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [akka-actor_2.11-2.4.0.jar:na]
realtime-job-service at akka.dispatch.Mailbox.run(Mailbox.scala:224) [akka-actor_2.11-2.4.0.jar:na]
realtime-job-service at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [akka-actor_2.11-2.4.0.jar:na]
realtime-job-service at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na]
realtime-job-service at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na]
realtime-job-service at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na]
realtime-job-service at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na]
It's not very urgent though because there is no extra external dependency required by this feature.
This was originally meant(I think) to time cap the execution of a PullingDispatcher. This just adds some complexity, and can probably be removed. If there is a need to time cap a Dispatcher, then a user can easily do this by scheduling a Shutdown
message.
currently the CB is per worker. not optimal
so that statsD report can be turned off in an override config file.
that way we can have per routee metrics
Now that the kanaloa system is growing bigger with more types of actors, it might be beneficial to use typed actors instead. This is up for triage.
A type class for creating an ActorRef is probably easier to use.
If something sends a message to a Queue, while it is retiring, it should reject it in the same fashion as it rejects work when it is overburdened. In this case, the "retiring" message should be a reason for QueueRejected.
Without this, a sender cannot correlate a this rejection with the message, and as such, cannot properly recover
Right now the dequeue rate metric is collected in Queue
, this metric is used by AutoScaling but the approach it took is problematic as it counts Timeouts and Failures also as successful dequeue. We shouldn't count Timeouts as successful dequeue.
We should probably differentiate between a backend capacity rejection and failures due to other reasons (such as invalid input or external errors). This is indicated in #79
To avoid some goofy effect from $conforms which can implicitly turn a T to a Identity T => T
I'd like to be able to configure the max length of errors. Currently, it's using the default value of a method here:
Right now kanaloa is not cluster aware, so, in a cluster, it will site in front of a cluster aware router and let the router forward all the messages to remote routees (backend actors). This issue is to mitigate the situation when a portion of routees start to become irresponsive (not processing messages) due to some anomalies. In such a case, although other routees are still working perfectly fine, the router will continue forward the same portion of traffic to the problematic ones. Ideally we want the problematic routees to be avoided or de-prioritized, and just let the working routees taking more work.
instead of using the cluster aware group, we use a pool to actually deploy Worker
to the routee side, this way each Worker
will be bound to a backend actor (through path or ref), and their pulling speed will also be bound to the handling speed of the backend actor. And we can have worker specific circuit breaker that breaks for a certain backend.
1, worker needs to be serializable, which includes the ResultChecker
which could be anonymous function (closure) right now. we might require a serializable ResultChecker
interface for cluster aware kanaloa dispatchers
2, There might be dynamic settings (e.g. total pool size, global circuit breaker) that need to be distributed across cluster now, although there might be a way we can avoid this.
Such as taking in a simple function (Req)=> Future[Either[Any, Response]] and create an actor out of it.
A few issues:
delayBeforeNextWork
is only meant to be applied only once, after it is applied to either sending work to the routee
, or applied in when it asks for Work. Currently, it is never resetLets remove all references to this as parameter functions, and only use the global one. This should help clear this up.
Codacy coverage isn't working.
to something easier to use.
core, test, example, statsD (need to make loading logic to be dynamic, or using implicit)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.