Comments (80)
We are starting a new project and Actors model fits perfectly. I wonder that using Kotlin Actors requires to mark all code with @ObsoleteCoroutinesApi
.
Do we have any progress on the actors implementation replacement?
from kotlinx.coroutines.
Hi :) Are there any updates about new actors API?
from kotlinx.coroutines.
What is your suggestion for those who want to use actors in the meantime (until actors come out of the experimental stage)? I'm currently working on a multiplatform library, so using coroutines to enable concurrency is very convenient, but I'm a bit puzzled about the best way (currently) to go about implementing something like a complex actor in your examples (A class which has suspending functions which work on shared mutable state).
from kotlinx.coroutines.
Once complex actors are implemented, what happens to the existing actor? Is it going to be removed?
Would it be possible to release the existing actor as a separate artifact for legacy support? I'm asking because I would like to get my company to adopt actor usage (I think it's an awesome way to build/update state), but the @ObsoleteApi
causes a bit of hesitancy for us to adopt within our company.
One approach we've considered is completely re-implementing the actor ourselves, but I think that risks some subtle bugs around concurrency and exception handling
from kotlinx.coroutines.
The key design consideration here is how a typical actor's code is going to look like. It is not clear in your example. The goal should be to minimize the amount of boilerplate that users of this library code are forced to write. I was thinking about something along the following lines:
abstract class Actor<T>(scope: ActorScope<T>): ActorScope<T> by scope {
abstract suspend fun onReceive(msg: T)
}
An abstract class lets us share some common state among all actor implementations (in this case, the fact that ActorScope
is provided to all the functions defined inside an actor impl). It also makes sense to share "for messages" loop, as every typical actor is going to have one, and leave only onReceive
for override.
With this bases class, the specific actor implementation looks like this:
class MyActor(scope: ActorScope<MyMsg>): Actor<MyMsg>(scope) {
// your private state here
override suspend fun onReceive(msg: MyMsg) {
// ... your code here ...
}
}
There is still some boiler-plate (MyMsg
and scope
are mentioned twice), but it is not clear how to further reduce it.
Now, you can define a generic actorOf
function to start any kind of actor:
fun <T> actorOf(
context: CoroutineContext,
init: (ActorScope<T>) -> Actor<T>
): ActorJob<T> = actor<T>(context) {
val instance = init(this@actor)
for (msg in channel) instance.onReceive(msg)
}
Using this function you can start your custom actor with a natural-looking and easy-to-read invocation actorOf(CommonPool, ::MyActor)
with ability to further customize construction of your actor impl by providing an explicit lambda for an init
block.
from kotlinx.coroutines.
Let me record here a draft design that I current have with respect to complex actors. The idea is to radically reduce boiler-plate that is required for writing actors that accept multiple message types by providing a handy facility that totally avoids the need to write sealed class
for messages by following this pattern:
class MyActor : Actor() {
// actor state is here, must be private
init {
// optional state initialization code
}
// public actor operation without result
suspend fun operationA(someParams...) = act {
// logic here
}
// public actor operation with result
suspend fun operationB(otherParams...): Result = actAndReply {
// logic here
}
private fun helper() { /* regular code, can access state */ }
}
The idea is that all public
functions defined on actor must be suspend
and must invoke a special function (tentatively called act
here) that captures "message parameters" (function parameters) and stores the message in the actor's inbox for processing. When response to the caller is needed, then a separate function (tentatively called actAndReply
here) is used.
We can have IDE inspections to help avoid "sharing" pitfalls to verify that all of the actor's state is private and all the public functions are properly written using act
pattern. IDE inspection can also help to ensure that only immutable types are passed in/out of actor or the proper defensive copies are made on mutable data (the latter is hard to ensure, though, without complicated reference capability annotations akin to the ones used in Pony language).
The reference to MyActor
instance serves as a type-safe reference to an actor and should implement a Job
interface. The actor is started lazily on the first invocation of any of its public (acting) functions or by explicitly invoking start()
. Lazy start feature prevents races when systems of communicating actors are written like this:
class MySystem {
private val actorA = object : Actor() {
// somewhere sends message to actorB
}
private val actorB = object : Actor() {
// somewhere sends message to actorA
}
suspend fun facadeOperation() { actorA.doSomethig() }
}
We shall also consider changing default to CoroutineStart.LAZY
for a simple actor { ... }
builder, too.
from kotlinx.coroutines.
Eagerly awaiting!
from kotlinx.coroutines.
I'd do something along these lines:
class MyActor {
// your private state here
suspend fun onReceive(msg: MyMsg) {
// ... your code here ...
}
}
fun myActorJob(): ActorJob<MyMsg> = actor(CommonPool) {
with(MyActor()) {
for (msg in channel) onReceive(msg)
}
}
You can also pass actor's channel
to MyActor
constructor if it needs access to it.
from kotlinx.coroutines.
Also, if we are starting on a path of generic actor-based programming, then we are inevitably going to be compared with other actor-based frameworks, including comparisons on the basis of performance. It does not mean that we have to worry about performance up-front, but we have to keep our designs, at least, optimizable in the future.
That is my concern for overridable invoke/run
method and other potential design decisions. Let me elaborate. A generic actor can, for example, start multiple concurrent actors to process a single mailbox concurrently if the actor's logic is stateless by itself. However, a stateful actor is must typically process its mailbox in serial fashion, e.g. it is a single consumer of its mailbox.
Current implementation of Channel
is multi-consumer/multi-producer queue which makes it widely-applicable (it can do both use-cases), but it inevitably costs some performance. One might implement an optimized version of channel for single-producer case to support stateful-actor use-case with better performance, but then we have to ensure that it is not possible for an actor to accidentally launch additional coroutines to read from its mailbox (to make the design less error-prone and more fool-proof). It means that we should actually hide actor's ReceiveChannel
from the actor itself.
from kotlinx.coroutines.
So will actor api change in future?
from kotlinx.coroutines.
what is the latest on the timeline/plans for actor api updates?
from kotlinx.coroutines.
@fvasco Yes, when you ask and actor and want a result back the proper design would be to have a suspend fun
with a normal (non-deferred) Result
. However, please note that this whole ask & wait pattern is an anti-pattern in actor-based systems, since it limits scalability. Well-designed actor-based system do not work that way.
Internally, actors are still implemented on top of channels.
from kotlinx.coroutines.
The more I use actors, the more necessary I find frameworks specially created to deal with the common actor patterns. I don't think we should create another actor framework but to make easier to integrate the amazing kotlin functionalities with the frameworks we already have.
And there are already two mature frameworks on the JVM world: Akka and Quasar. I didn't use Quasar, but as it uses their own coroutines, they seem to be closer to the Kotlin aproach. I have more experience with Akka, where actors suspendable actors should be useful when dealing with blocking code (because of IO or high computation costs).
My plan right now is to develop a Akka Typed Kotlin DSL that makes easier and nicer to call the Scala/Java APIs from Kotlin. That is not very ambitious and in fact it can be done with a very small project. It is very easy to create suspendable actors that stage messages while the actor is suspended. The main problem there is that it may be not very efficient. What I would really like is to create an Akka Actor System that is able to execute suspendable actors natively, but:
- Doesn't seem easy.
- Is more related to Akka itself than Kotlin
By the way, I want to emphasize that Kotlin actors are fantastic for most cases. Frameworks like Akka are needed once you want do more complex things like scheduling (although it wouldn't be difficult to implement in Kotlin actors), clustering (using location transparency) or persistence (for event source systems, for example). I'm thinking on migrate the application I wrote using Kotlin actors to Akka just because of the last two features, but Kotlin actors allow me to rapidly create the first version of it and most importantly it was very easy to think on concurrency and error management on a higher level and I'm sure that by using Kolin actors the application is more resilent that it would be if it was implemented on other non-actor based frameworks .
from kotlinx.coroutines.
Currently it is possible for actor to send a message to yourself
because ActorScope
exposes Channel
and not only ReceiveChannel
(being fully aware of potential deadlock when channel is at capacity).
It would be awesome if you keep this in mind and make it possible with the new design as well.
from kotlinx.coroutines.
I wrote an implementation: https://gitlab.com/snippets/1957446
For one-way requests, it has an almost identical performance to the current actor builder (I compared it against the sample code in https://kotlinlang.org/docs/reference/coroutines/shared-mutable-state-and-concurrency.html with the same optimisations, dispatchers, etc on both sides
For two-way requests, it is very marginally faster than the current actor implementation. However, it is much better than the current implementation because you can do the entire request and response in one line in the main(). It is also much easier to implement it in the actor itself, as there is no when clause. It also supports running with concurrency, when you have no mutability problems (but then why not just use normal functions?). Error handling should be fine too - errors pass through. I could probably make a cancelOnError parameter too.
However, this version makes it very easy to leak the actor scope. So I made https://gitlab.com/snippets/1957455 which is no more restrictive, but uses subclassing to simplify the boilerplate and syntax, and also to prevent leaking, due to the protected fields. However, this can't be used when there is another superclass, because multiple inheritance is forbidden. It also doesn't work with kotlinx.serialization.
I'm putting this here in case I'm going about it completely the wrong way, in which case I won't make a PR.
EDIT: I ran my benchmarks starting all the addition operations in parallel, as follows:
...
coroutineScope {
repeat(1000000) {
launch {
...
}
}
}
...
Upon removing the launch block from the benchmark (for 2-way communication), my solution accelerated to about double the original speed, whereas the current actor slowed down to 0.2 of the original speed, and to 0.5 for 1-way communication. All the benchmarks of my code were 2-way, because it's essential an RPC so making it 1-way doesn't make sense.
EDIT 2: In the first 2 iterations I forgot to make the actual actions suspending. Here is a documented, fixed version: https://gitlab.com/snippets/1957572
from kotlinx.coroutines.
Regardless of how the actor piece is brought in- it would be good to have the ability to add supervision strategies like being able to restart on exception
from kotlinx.coroutines.
Hi @elizarov, it's 2024. What about the new API?
Dude doesn't even work for Jetbrains anymore mate
from kotlinx.coroutines.
I have been working with Kotlin actors for months and I think they have very low boilerplate in comparison with untyped Akka actors (where usually you need to extend a class if using the Java dsl, the only one I can use from Kotlin). But with the introduction of Akka Typed, the boilerplate was reduced a lot and they are almost as compact as Kotlin actors.
When I found Kotlin actors problematic is when you try to go beyond the most trivial use cases. I understand that I may have a bias towards Akka because I'm most use of it, but I find quite complex to implement in Kotlin some common actor patterns like send a message to yourself, create a ticker/scheduler, create an adaptor or receive messages from different type hierarchies. Guys from Akka have been trying to solve this problems for 10 years after three attempts, it seems that they have found a nice API that is at the same quite time safe at compile, easy to use and quite compact.
Simple Akka Typed actors are written as a function in a way that is almost equal to current Kotlin actors and the most complex patterns are delegated on a object call ActorContext that is obtained as a parameter.
Akka Typed is not perfect either. There are some use cases that are not easy to implement. For example the common case on which you want to stage all messages received by an actor until some message arrives. On Akka Typed you need to create a behavior to do that. On Kotlin we can implement the ask pattern using suspending functions, suspending the actor until the answer is found.
The Akka Typed has some flaws that make it not completely secure at compile time. For example some functions on ActorContext can be called from any thread but most of them can only be called from the actor thread. That is clearly explain on the javadoc of each method, but it is easy to forget. I have been working on a small Akka Typed Kotlin DSL where can be called from any thread are normal methods, but the others are suspending methods that are executed on the actor thread by using the ask pattern.
I'm not saying Kotlin should copy the Akka Typed API, just learn from it and improve it. There are still problems that are difficult to solve by Akka where Kotlin has a big advantage.
The way to use the actor proposed by Elizarov has its advantages, but I would prefer to have an ActorRef/Channel to send messages and then create extensions functions that call act/tell or actWithReply/ask and being able to define actors (even complex actors!) with behaviors/functions instead of creating classes.
from kotlinx.coroutines.
Waiting until the actor has processed the message is sometimes useful, but is always dangerous and easily leads to deadlocks. I would not recommend to support it.
from kotlinx.coroutines.
Hmm, baking this framework into the language seems as a bit weird approach to take.
from kotlinx.coroutines.
Based on the description of Stateful Actor and the idea of reducing boilerplate, I would like to propose following abstraction (inspired by orbit-mvi's Container). Let me know if anything might go wrong with this implementation.
cc: @elizarov @fvasco
interface ActorContext
abstract class StatefulActor(
private val parentScope: CoroutineScope,
bufferCapacity: Int = 64
) {
private val dispatchChannel = Channel<suspend ActorContext.() -> Unit>(bufferCapacity)
private val initialized = AtomicBoolean(false)
private fun initializeIfNeeded() {
if(initialized.compareAndSet(false, true)) {
parentScope.launch {
for (action in dispatchChannel) {
launch { context.action() }
}
}
}
}
fun shutdown() {
dispatchChannel.close()
}
protected fun act(action: suspend ActorContext.() -> Unit) {
initializeIfNeeded()
dispatchChannel.trySendBlocking(action)
}
abstract val context: ActorContext
}
Here's one actor implementation using this API
class MyActor(scope: CoroutineScope) : StatefulActor(scope), ActorContext {
private var clickCounter = 0
override val context: ActorContext get() = this
fun increment() = act { clickCounter++ }
fun printCount() = act { println(clickCounter) }
}
I found it surprisingly fast as well (Unless I'm making some mistake measuring the performance).
Playground with above snippets and measuring setup: https://pl.kotl.in/RHPpCbcUT
from kotlinx.coroutines.
Makes sense. I'd keep an extended version. Maybe rename actor.invoke()
to actor.run()
(similar to Thread's run
method). I'm not sure that an actor base class should be called ActorBase
. I'd either keep it an Actor
to be short or name it AbstractActor
.
from kotlinx.coroutines.
@gortiz All valid points. However, we do not plan to turn kotlinx.coroutines
into an actor-based framework with actor refs, actor systems, actor contexts, replies, etc. In terms of kotlinx.coroutines
actor is just a code organization pattern -- a coroutine plus an incoming channel. Nothing more. Something more than that has to be living in a separate library.
from kotlinx.coroutines.
Not yet. We are busy with Kotlin Flow for now, which is a higher-priority for us to finish.
from kotlinx.coroutines.
But by using Kotlin's foundation of async programming it would be easier to implement the same in Kotlin than it was for Akka in Scala. Kotlin has by default all primitives for that (channels, queues, back pressures, contexts(coroutines), supervisors).
I think we are not talking about the same thing. One thing is to create a toy project that implement this kinds of things on a biased way and another is to have a product that can be used by different people of different organizations. Of course, it would be easier to create a new Akka on top of Kotlin Coroutines than to create it from zero, but it is still has a huge cost but from my point of view, very little benefits (which are mainly multi platform compilation and to be written in Kotlin instead of Scala because we like the former more).
IMHO it is not worth it and it doesn't seem that Jetbrains is interested on that.
I'd also would like to have something similar as Akka, but due lack of Scala knowledge in our team and unknown future of Scala itself we are avoiding it.
Akka is used on tons of real world open source projects including Apache Flink (for sure) and Spark (if I remember correctly) and commercial (there is a list here)and is developed by a historic company. To be honest, I would trust Akka and Lightbend more than an actor framework created by a individual contributors and/or small companies.
I don't love Scala and I'm not an expert on it, but the documentation is quite good, almost all examples are in Java and the people of Lightbend is very open to help developers even if they don't pay for support.
The actor paradigm is not that popular and between Akka and Quasar there are already too many actor frameworks on the JVM and I'm afraid that there will be even more once Project Loom is released. In my opinion, if there is no very important technical reason to create a new framework, I wouldn't like to create a new one to compete with Akka, that has a big enough community, has been proved on production, is evolving quite fast and, very important, is compatible with the language I love: Kotlin
I think my point is clear and I wouldn't like to tangle even more this thread (which I think I personally shifted too much from their original topic)
from kotlinx.coroutines.
I see where Elizarov is coming from, I also think that the pony way for Actors is really awesome but they have the actor support in the language not just in a library.
suspend fun operationA(someParams...) = act {
// logic here
}
Would look much better and clearer if it would be
act operationA(someParams...) = logic here
I asume that new keywords are costly and won't get added into the language that easy. But otherwise also the "Class" keyword could be replaced by an "Actor" keyword similar how we have the "Object" keyword does it to (one of the reasons why I prefer Kotlin to Java). I'm writing on mobile and the preview button doesn't work right now, sry if the code is bad formatted.
from kotlinx.coroutines.
Note that Swift is currently working on actor model support, I wonder if some inspiration could be taken:
Food for thoughts:
https://news.ycombinator.com/item?id=26480922
Edit:
Active review:
https://forums.swift.org/t/se-0306-actors/45734
Review by Chris latner:
https://forums.swift.org/t/se-0306-actors/45734/4
from kotlinx.coroutines.
Hi @elizarov
We can have IDE inspections to help avoid "sharing" pitfalls to verify that all of the actor's state is private and all the public functions are properly written using act pattern.
I considering a lot this phrase, I wish to avoid this type of language support (workaround) around as much is possible.
To enforce this we can encapsulate the actor's state, so the programmer is forced to invoke the act
function.
I try to explain better myself: an actor requires: a queue, some message types and an optional state.
I encapsulate the state
in a ActorState
type, and I can operate on it using some functions, like update
(names are only examples).
The follow interface can encapsulate the state
interface ActorState<T> : CoroutineScope {
suspend fun update(block: T.() -> Unit)
}
so we define a simple builder (may requires more parameters):
fun <T> actorState(state: T): ActorState<T> = TODO()
Finally an actor's example
class Counter /* no superclass required */ {
private var state = actorState(object {
var count = 0
})
suspend fun increment(amount: Int = 1) = state.update { count += amount }
}
That's all, I cannot more avoid the queue.
from kotlinx.coroutines.
@adam-arold The plan roughly outlined here (for "writing complex actor") is approximately how far it will go as a part of kotinx.coroutines
in a foreseeable future. If you want more, I'd suggest to start writing your own coroutine-based actor framework. I'm sure it'll be quite popular if you make it open source.
from kotlinx.coroutines.
Kotlin actors are very low level and this patterns/strategies are not implemented yet and for what @elizarov said, they are not going to be implemented soon. As @Globegitter said, they are not trivial to implement.
As I said earlier, my personal recommendation is using Akka Typed with Kotlin Coroutines. They match so easily that seem to be created one for the other. I proposed a talk about that for KotlinConf, but unfortunately it wasn't chosen. I will try to blog about Akka Typed + Coroutines after the summer. Meanwhile I encourage you to try yourself!
from kotlinx.coroutines.
from kotlinx.coroutines.
Let me voice my concern that adding such functionality to the core is a start of a slippery road of turning it in into an actor-based programming framework. There are lots of issues to be addressed in a large-scale actor-based programming and the corresponding support libraries are only bound to grow over time. Maybe we should think-through what other things needs to be added, beyond an actor base-class and a construction function, and have a separate module to support all of that.
from kotlinx.coroutines.
Even with capacity = 0
the Actor
is asynchronous. You can send a message to an Actor
and continue working on your code, while actor processes your message concurrently with your code. The Mutex
, on the other hand, is always synchronous. No concurrency. That is, conceptually, why solutions based on Mutex
/synchronized
do not scale well.
from kotlinx.coroutines.
@elizarov does this mean that I should not wait for improvements to the current actor
? The documentation points to this issue and states that the current actor
is obsolete and they will get replaced with the introduction of complex actors.
from kotlinx.coroutines.
I'm planning to work on a Kotlin approach (not a wrapper) because I'm going to write common
code which will work on all Kotlin-supported platforms. For this reason, Akka and Quasar are not applicable in my use case.
from kotlinx.coroutines.
Hi, @tristancaron,
pay attention, MutableStateFlow
does not provide a CAS mutator (yet), _counter.value++
is not atomic.
from kotlinx.coroutines.
get and set value
is thread safe, however _counter.value = _counter.value + 1
is not atomic, so you can successfully write a wrong value.
from kotlinx.coroutines.
Hi, @handstandsam,
I exposed above some classic data race issues.
A single writer does not need synchronization (with other writers).
Obviously multiple concurrent writers, even in the same context, may fail (https://kotlinlang.org/docs/reference/coroutines/shared-mutable-state-and-concurrency.html#mutual-exclusion).
from kotlinx.coroutines.
Hmm, baking this framework into the language seems as a bit weird approach to take.
I agree, it should be a library based approach (like it is now)
from kotlinx.coroutines.
what is the latest on the timeline/plans for actor api updates?
Reddit AMA they said possibly start work in 2022? #485 (comment)
from kotlinx.coroutines.
Hi @elizarov, it's 2024. What about the new API?
from kotlinx.coroutines.
Hi @elizarov,
hi solved this issue in the same way, but an helper class looks like a miss of abstraction.
I want propose to introduce a dedicated interface for actor, something like:
interface Actor : ActorScope {
fun invoke() : Unit
}
and the new function
fun launch(context: CoroutineContext, ..., actorBuild: (CoroutineContext) -> Actor) : ActorJob
I want to propose a draft implementation in the next week, what do you think about it?
from kotlinx.coroutines.
onReceive
method is comfortable to use, but I want to expose you my concerns.
The actor
function uses the iterable pattern, instead the Actor
class use visitor pattern. Personally I consider the iterator pattern a better choice becouse it is possible to detect actor termination (end of iteration).
Using the explicit itaration in Actor
makes it cut-and-paste compatible with the actor
function.
I suggest to mantain Actor
as public interface and to provide a comfortable implementation
public abstract class ActorBase<E>(actorScope: ActorScope<E>) : Actor<E>, ActorScope<E> by actorScope {
override suspend fun invoke() {
for (message in channel)
onReceive(message)
}
public abstract suspend fun onReceive(message: E)
}
Finally actorOf
misses of some parameters, we should keep this one, an extended version:
public fun <E> actorOf(
context: CoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
builder: (ActorScope<E>) -> Actor<E>
): ActorJob<E> = actor(context, capacity, start) {
val actor = builder(this)
actor.invoke()
}
or both?
from kotlinx.coroutines.
At this time of development, a stateless concurrent actor can be implemented using a BroadcastChannel
to create a different ActorScope
for each worker actor.
However your concern sounds right to me, but I suggest to mantain an uniformity of actor
function and Actor
class
public interface Actor<in E> : CoroutineScope {
public suspend fun onReceive(message: E)
}
public fun <E> actor(
context: CoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
onReceive: suspend CoroutineScope.(E) -> Unit
): ActorJob<E>
In such case we deprecate ActorScope
in favour of Actor
.
from kotlinx.coroutines.
The idea of concurrent stateless actor is to have multiple instance working on the same mailbox in round-robin fashion as opposed to broadcasting messages to all actors. I actually considered adding an additional concurrency: Int = 1
optional parameter to actor
builder to cover this use-case, so that you can launch multiple actors under the umbrella of a single ActorJob
. However, it is not that really straight-forward as it does require us to start dabbling into supervision strategies, because multiple running coroutines do make it urgent to figure out what you are going to do when one of them crashes.
from kotlinx.coroutines.
Yes obviously, I was wrong.
As you proposed above I consider preferable a different module for actor supervisor and so on.
In my opinion this module should offer a common pool of interfaces for asynchronous programming, an "actor" module should implement more complex use cases, but I hope that the "core" module can interoperate well with existent solutions like Quasar or Akka.
So in my limited scope of view this "core" module shoud propose an "Actor" as a "Producer" dual, all other implementation should extends core's interfaces and provides its own builders.
from kotlinx.coroutines.
Hi @elizarov
I wish expose some consideration about your -interesting- proposal.
Actor
, as intended until now, is the dual of Producer
, so your design does not help to write complex actor.
Your proposal looks like a fully synchronized, non blocking class, which is equally interesting.
I wrote a dummy implementation using your requirement plus one: avoid abstract superclass; so I wrote an task synchronization helper (TaskQueue
here) and a case of use.
class TaskQueue(
val context: CoroutineContext = DefaultDispatcher,
val mutex: Mutex = Mutex(),
lazyInit: (suspend CoroutineScope.() -> Unit)? = null
) {
private var lazyInit: Deferred<Unit>?
init {
this.lazyInit = lazyInit?.let {
async(context, start = CoroutineStart.LAZY, block = it)
}
}
/**
* Force lazy initialization
*/
suspend fun init() {
lazyInit?.run {
await()
lazyInit = null
}
}
suspend operator fun <T> invoke(block: () -> T): T {
init()
return mutex.withLock(this) {
withContext(context) {
block()
}
}
}
}
class HttpSession {
val start = Instant.now()
private lateinit var state: MutableMap<String, String>
private val taskQueue = TaskQueue {
state = mutableMapOf()
}
suspend fun get(key: String) = taskQueue {
state[key]
}
suspend fun set(key: String, value: String) {
taskQueue {
state[key] = value
}
}
}
Plus: using this implementation and issue #94 makes easy to implement a read task queue and a write task queue.
from kotlinx.coroutines.
@fvasco Indeed, it does look like a "like a fully synchronized, non blocking class", but it is not one. There are lots of similarities between monitor-based synchronization (like synchronized methods in Java) and actor-based programming model (like behavior functions in Pony). But there are important differences, too. Let me cite Pony documentation here:
A behaviour is like a function, except that functions are synchronous and behaviours are asynchronous.
Let's take a look at it in the context of Kotlin. First of all, we don't need new primitive for a "fully synchronized class". We already have Mutex
. So, a fully synchronized class can be written like this:
class HttpSessionSync {
private val mutex = Mutex()
// state initialization does not have to happen under lock
private var state: MutableMap<String, String> = mutableMapOf()
suspend fun set(key: String, value: String) = mutex.withLock {
state[key] = value
}
// etc
}
Notice the boilerplate here. We have to define
private val mutex = Mutex()
every time we use this pattern, so some kind of out-of-the boxMutex
abstract base class might help. Shall we makeMutex
open? Anyway, we don't want to promote this pattern, so we will not further discuss it in this thread.
You've made an interesting observation that requiring base class for complex actors is not good idea, so while, IMHO, we should give an option of using one, it should not be a requirement. Let's sketch implementation of a complex actor without having to use a base class:
class HttpSessionActor {
private val actor = Actor()
// state initialization does not have to happen under lock
private var state: MutableMap<String, String> = mutableMapOf()
suspend fun set(key: String, value: String) = actor.act {
state[key] = value
}
// etc
}
What is the difference here as compared to Mutex
-based version? The difference is that an Actor
has an inbox channel and sending messaged to an actor can be decoupled from their execution. When HttpSessionActor.set
is invoked, the invoker can go on with it own work while HttpSessionActor
is busy, unless actor's mailbox capacity is exhausted. In the latter case, the invoker will have to wait until mailbox has capacity to store a message, but not longer. This starts to be important for scalability when actors perform long-running asynchronous activities.
from kotlinx.coroutines.
@elizarov thanks for explanation,
in some sense a Mutex
act like an actor with capacity = 0
and without coroutine context.
I fix my draft, but I sure that it is possible to implement a better one.
class TaskChannel(
context: CoroutineContext = DefaultDispatcher,
capacity: Int = 0,
lazyInit: (suspend CoroutineScope.() -> Unit)? = null
) {
private val tasks = Channel<Task<*>>(capacity)
private var lazyInit: Deferred<*>? = async(context, start = CoroutineStart.LAZY) {
lazyInit?.invoke(this)
launch(coroutineContext) {
tasks.consumeEach { it() }
}
}
/**
* Force lazy initialization
*/
suspend fun init() {
lazyInit?.run {
await()
lazyInit = null
}
}
suspend fun <T> act(block: suspend () -> T): Deferred<T> {
init()
val task = Task(block)
tasks.send(task)
return task
}
suspend fun <T> actAndReply(block: suspend () -> T): T = act(block).await()
private class Task<T>(block: suspend () -> T) : CompletableDeferred<T> by CompletableDeferred() {
private var block: (suspend () -> T)? = block
suspend operator fun invoke() {
try {
complete(block!!())
} catch (t: Throwable) {
completeExceptionally(t)
} finally {
block = null
}
}
}
}
from kotlinx.coroutines.
Even with capacity = 0 the Actor is asynchronous
The Mutex ... is always synchronous.
@elizarov can you confirm the follow code snippet?
suspend fun operationB(otherParams...): Result = actAndReply {
// logic here
}
Is the functions's return typeResult
and not Deferred<Result>
?
Accordly with #261 it is pretty easy write the act
function on top of a single private executor.
from kotlinx.coroutines.
I've created an implementation of such an "ActorState" (as described by @fvasco)
Maybe it can be of some use, or you could also tell me how it is complete crap ;)
https://gist.github.com/vemilyus/c19f5d75525a37b33c4640bfd61158fe
from kotlinx.coroutines.
you could also tell me how it is complete crap
I am really tempted :)
You should use only a read-write mutex (#94), or a regular channel or mutex.
from kotlinx.coroutines.
The elizarov's proposal looks great in paper, but I would not delegate the sharing check responsibility on the IDE.
I prefer the Akka Typed approach where sharing is forbidden by the compiler and the separation between the channel/mailbox and the behavior is explicit.
from kotlinx.coroutines.
@gortiz At this point we are looking at pure library/IDE solution. Langauge-level changes are out the question for now. As for the explicit separation of behaviour, our goal here is to reduce the amount of boilerplate one need to write to define an actor, not increase it.
from kotlinx.coroutines.
@elizarov I'll do that! I'm going to wait until your final solution is released then I'll build on top of that. @gortiz do you have plans for doing something similar?
from kotlinx.coroutines.
@elizarov do you have a rough ETA on the final version of this Issue?
from kotlinx.coroutines.
Kotlin Flow is gonna be awesome. I'll keep myself busy with something else in the meantime.
from kotlinx.coroutines.
No decision yet on current actors.
from kotlinx.coroutines.
As this does not really mention much around this topic and I could also not find that much in other issues, what is the current statur around supervisors/supervision of actors, i.e. having strategies for automated restarts if a coroutine crashes. Is that something on the roadmap? If so is this something covered by this issue or would it make sense to create a new issue for this to have one place everyone can find who is interested in this topic?
from kotlinx.coroutines.
@Globegitter Automated restarts has to be a separate issue. Currently, there are no plans nor designs around it. It seems to work pretty well using existing tools when you just define a code that launches your coroutine as a function as then call it on crash to restart. It requires some boiler-plate to write, but due to good higher-order functions in Kotlin you'd need to write the corresponding code only once per project. For example, if you want to restart on any exception, just define:
fun CoroutineScope.launchAndRestart(body: suspend () -> Unit) {
fun start() = launch {
try { body() }
catch (e: Throwable) { start() }
}
start()
}
Then use it: launchAndRestart { ... some code ... }
Note, that you can easily add your project-specific logging, exception-handling, and timeout logic. It does not seem to be very useful to have this kind of function in the core library, since it is really short.
from kotlinx.coroutines.
Yeah true that is not too difficult indeed, but then looking at how other languages/frameworks have implemented that they usually offer different strategies, such as OneForOne, AllForOne, RestForOne, with optional exponential backoff, configuring an optional max number of retries etc. and getting that all right becomes imo less trivial all of the sudden. Granted, I am still quite new to this actor pattern and I am still in very early phases of our project, so still figuring out what behaviour makes the most sense etc, but from my point of view, if there was more native support for that, or some mentions in the docs about this it would make experimentation and evaluation easier.
All that said, I love the progress you guys are making and the overall direction this is heading into and coming from a mostly pythong/golang background it has been enjoyable to dive into coroutines/actors with kotlin.
from kotlinx.coroutines.
What about idea to port Akka ideas to Kotlin? Maybe without clustering first just core concepts.
from kotlinx.coroutines.
I would choose to avoid the fragmentation of the ecosystem. It is not trivial to create (and maintain!) a framework like Akka. In fact the worst thing about Akka is that the Java community (almost) ignores it, when it is an amazing framework to work with.
The main advantage of a Kotlin version of Akka is that it could be easily compiled to JS meanwhile connecting scala-js and kotlin-js and compiling Akka to js may be difficult (if possible). At the same time, I don't know web or android developers really need the complex actor patterns implemented in Akka. I would say that in these scenarios Kotlin actors are powerful enough (but I have almost 0 experience in these scenarios).
What I would really like to have is a Akka Kotlin DSL (the same way there is an Akka Java and Scala DSL), which would be almost as easy to implement as the Log4j Kotlin DSL.
from kotlinx.coroutines.
But by using Kotlin's foundation of async programming it would be easier to implement the same in Kotlin than it was for Akka in Scala. Kotlin has by default all primitives for that (channels, queues, back pressures, contexts(coroutines), supervisors).
Maybe with CoroutineContext serialization and deserialization it would be not so hard to implement Akka's Location Transparency ( https://doc.akka.io/docs/akka/current/general/remoting.html )
Sure messaging and clustering is complex thing and for this better to use existing solutions (Spark streaming, Kafka or similar ..) for clustering maybe (HashiCorp Consul - https://learn.hashicorp.com/consul/ )
I'd also would like to have something similar as Akka, but due lack of Scala knowledge in our team and unknown future of Scala itself we are avoiding it.
from kotlinx.coroutines.
@elizarov thanks for the upvote, I'll start on the tests for it.
from kotlinx.coroutines.
We are not ready to accept PR for actors right now.
Adding a feature is not only about the code that solves a specific user task or problem.
It is about the feature being consistent with existing features and future ones (that we already discussed and not published anywhere), about the same "library experience", about approachable and non-clashing naming (with existing features, with the-same-name-but-slightly-different-behaviour from other languages etc.), about being performance-proof, about naming bikeshedding (a lot of bikeshedding), about API shape that is hard to misuse and lot more. The actual code is not the biggest issue here :)
It's just a lot of work we are neither ready to do right now for actor nor ready to guide someone through this path via ten iterations of PR reviews.
from kotlinx.coroutines.
Okay. I'll make a fork and use that for my project.
from kotlinx.coroutines.
Is there likely going to be any API level changes to how actor
works with Coroutines at present?
from kotlinx.coroutines.
No on the near future. We do plan to introduce a more convenient API for writing actors. Existing actor
function will be either integrate with new API or deprecated with some clear replacement. Old code using it will continue to work. We don't have a complete design on the table yet, so it is subject to change.
from kotlinx.coroutines.
Where is the new design being discussed? Is a lot of the setup erlang
/elixir
inspired?
from kotlinx.coroutines.
@harsh183 It is not in active discussions now, but postponed until the most urgent Flow
issues are implemented.
Unfortunately, that's not much design inspiration we can take from either Erlang or Elixir, because both are dynamically typed languages, and so they avoid having to deal with the complicated problem of providing type-safe and boilerplate-free actor abstraction, which is our goal here.
from kotlinx.coroutines.
from kotlinx.coroutines.
Something Iām thinking about is how send(..)
on the current actor
behaves (as it implements SendChannel<T>
).
I believe it might be useful to suspend until the actor has completed its processing of the message.
So then there would be 3 ways to communicate into the actor:
- non-suspending
offer(..)
2.send(..)
that suspends until the actor can guarantee the message will be delivered send(..)
that suspends until the actor has processed the message
Under what conditions is number 2 useful?
from kotlinx.coroutines.
What is the benefit of using Actors instead of StateFlow? Here two implementations of CounterModel:
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
and
class CounterModel {
private val _counter = MutableStateFlow(0) // private mutable state flow
val counter: StateFlow<Int> get() = _counter // publicly exposed as read-only state flow
fun inc() {
_counter.value++
}
}
from kotlinx.coroutines.
Thanks for taking the time to answer @fvasco
So MutableStateFlow
is not atomic, thus we can have a data race. Am I understanding it correctly?
If so, the documentation seems to imply that data race is not possible. https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-state-flow/index.html
Concurrency
All methods of data flow are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.
Unless I misunderstood it.
from kotlinx.coroutines.
@fvasco - Since the get
and set
methods of StateFlow
are thread-safe, isn't there no need for _counter.value
to be atomic? All get
and set
requests will be processed in order meaning that there would be no race conditions. If you ended up doing background work inside the model to modify _counter
, I could see an issue, but as long as you are doing computations within that context you should be fine, right?
from kotlinx.coroutines.
What should I use in the meantime if I don't want to use a Mutex
(because I'm afraid of deadlocks)? What I have right now is a super primitive wrapper class that can get/transform its state:
class Atom<T : Any>(initialValue: T) {
@Volatile
private var value: T = initialValue
override fun get(): T = value
@Synchronized
override fun transform(transformer: (T) -> T): T {
value = transformer(value)
return value
}
}
but with this I can still shoot myself in the foot if two of these Atom
s cause a deadlock. What i'm looking for is a way to calculate a new value for shared mutable state (transform
here) that's relatively fast. I scrapped my other implementation that was using actor
when it became obsolete, but I'm not happy with the current solution.
from kotlinx.coroutines.
@dimsuz I don't know but at least the pros and cons of doing this should be outlined in their RFC
from kotlinx.coroutines.
@Kshitij09-sc
You are mixing blocking (dispatchChannel.trySendBlocking(action)
) and asynchronous (action: suspend ActorContext.() -> Unit
) code, this is a delicate choice and may lead to errors.
The Job in initializeIfNeeded
executes actions concurrently (two nested launch
), so printCount
will print a number concurrently with other tasks.
You are using an AtomicBoolean
instead of a lazy launch
.
from kotlinx.coroutines.
@fvasco Agree with latter 2 points. Regarding mixing trySendBlocking(action)
with suspending action: suspend ActorContext.() -> Unit
, I considered this to be able to detect deadlocks. trySendBlocking will return failure once queue is full, queue might get full due to less buffer capacity or some action went into infinite loop (deadlock) which could be thrown as exception in debug environment.
from kotlinx.coroutines.
Related Issues (20)
- StateFlow, different values, compareAndSet return false HOT 3
- 1.9.0-RC test task:compileDemoDebugUnitTestKotlin error Suspension functions can only be called within coroutine body. HOT 1
- Improve invokeOnCompletion and invokeOnCancellation API HOT 1
- Mention `testScope.backgroundScope` in `UncompletedCoroutinesError`
- Allow `Dispatchers.Unconfined` to use the event loops as time sources
- Add optional support for Micrometer Context Propagation
- `withContext` may execute code in the wrong context if the `coroutineContext` misleads it HOT 3
- SharedFlow doesn't have same parameters as in constructor function HOT 2
- Consider deprecation cycle for `CoroutineDispatcher.invoke` HOT 6
- How to prevent a SharedFlow collect values when the activity resumes? HOT 1
- Consider discouraging `CoroutineStart.LAZY`
- Some problem about `addLast` in LockFreeTaskQueue HOT 1
- Maybe it's reasonable to recommend wrapping a `callbackFlow` initialization into `try`-`finally`? HOT 4
- Dispatcher failures may leave coroutines uncompleted HOT 1
- `DelayWithTimeoutDiagnostics` missing from R8 rules? HOT 3
- Introduce Flow.all/any/none operators HOT 3
- [WASM] JsException: Exception was thrown while running JavaScript code kotlinx.coroutines.error_$external_fun HOT 6
- Suppressed exceptions are lost during stacktrace recovery HOT 1
- `runBlocking` executes other tasks instead of its own HOT 1
- "Kotlin Compiler Error: NoClassDefFoundError for kotlin/reflect/full/KClasses during Gradle build" HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
š Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ššš
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ā¤ļø Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kotlinx.coroutines.