Coder Social home page Coder Social logo

Comments (45)

elizarov avatar elizarov commented on September 23, 2024 24

The expected future looks like this. There going to be only a single pool of threads in addition to UI thread.

  • Experimental coroutines scheduler becomes DefaultDispatcher by default (CommonPool' becomes deprecated). So, you launch { .... }` CPU-bound task in the default dispatcher. Its default parallelism is equal to the number of cores.
  • Experimental coroutines scheduler backs IO dispatcher, too, but default parallelism of IO is effectively unbounded. You can do withContext(IO) { ... } without having switch to a different thread, but if too many threads get blocked on IO, then new threads will spin up automatically to help.
  • You can create additional dispatchers backed by the same thread pool with a custom limit on parallelism for doing things like synchronous DB access (see #261).

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024 5

Let me clarify. Here is a problem we have: We need to be able to do both CPU-bound tasks and IO/blocking tasks with coroutines. The original proposal (this github issue) was to achieve this via a dedicated IO-tuned dispatcher, so you'd separate these two use-cases by choosing an appropriate dispatcher:

launch(CommonPool) { ... cpu-bound code ... }
launch(IO) { ... blocking IO code ... }

The alternative idea is to borrow Scala's approach: instead of two separate dispatchers let's have a single dispatcher (let's call it DefaultDispatcher) and use it like this:

launch(DefaultDispatcher) { ... cpu-bound code ... }
launch(DefaultDispatcher) { blocking { ... blocking IO code ... } }

What I particularly like about it, is that with this approach is makes sense to truly make this DefaultDispatcher a global default for all coroutine builders and just write:

launch { ... cpu-bound code ... }
launch { blocking { ... blocking IO code ... } }

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024 3

That is what I'm thinking about.... For example, if you do boo(); blocking { foo() } in the "IO-aware" thread it can run in without switching threads (which is cool for performance reasons), however, if you do the same code in UI, then blocking { foo() } can suspend, switch to background thread, then resume back in UI thread.

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024 2

@maxd Unfortunately, there is no "one fits everybody" solution to cancelling blocking operations. Different blocking libraries support different approaches to cancellation or none at all. Some of them support interrupted flag, others crash and/or hang when their thread is interrupted. For those cases that do support interrupt we have the following issue open: #57

I would not recommend to use the code you've provided in production code for two reasons:

  • It creates and leaks a thread with Executors.newSingleThreadExecutor() on each invocation.
  • It uses blocking future.get inside of async, thus unnecessarily blocking a thread in the default context.

As a work-around for your particular case I'd recommend doing this.

First, implement awaitInterruptible extension for CompletableFuture in a straightforward way:

suspend fun <T> CompletableFuture<T>.awaitInterruptible(): T =
    suspendCancellableCoroutine { cont ->
        whenComplete { value, exception ->
            when {
                exception != null -> cont.resumeWithException(exception)
                else -> cont.resume(value)
            }
        }
        cont.invokeOnCompletion { 
            cancel(true) // interrupt running!
        }
    }

I've submitted a related feature request: #259

Then define one shared thread-pool for all your blocking calls:

val threadPool = Executors.newCachedThreadPool() // this seems to be the best type of pool to use

Finally, you can define the following helper function to make conversion of your interruptible blocking calls to suspending cancellable functions straightforward:

suspend inline fun <T> blockingInterruptible(crossinline block: () -> T) =
    CompletableFuture.supplyAsync(Supplier { block() }, threadPool).awaitInterruptible()

from kotlinx.coroutines.

voddan avatar voddan commented on September 23, 2024 2

@elizarov What's the state of mind on this issue? Dispatchers.IO is as confusing as ever.

IMHO of all the names suggested above, Dispatchers.Elastic is the most direct and self-describing one. Also, it draws a useful parallel with Amazon Elastic Compute Cloud.

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024 1

@fvasco It finally clicked into my mind when I understood how it shall interface with a scheduler. The difference from PR #83 is that it should not require a switch of context at all if the scheduler that currently runs coroutine natively supports blocking operations itself and counts the number of ongoing blocking operations to make decisions on creating new threads.

from kotlinx.coroutines.

dave08 avatar dave08 commented on September 23, 2024 1

Or ElasticPool, ExtendablePool? And withBlocking { }?

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

Java 6 has Executors.newCachedThreadPool, it is easy to create and easy to customize:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

We can tune maximumPoolSize at startup to limit "the maximum allowed number of threads" throught a System property.

However we should consider that specific use case should create a fine tuned IO executor without using the global one.

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

We need to make sure that there is no confusion in difference between run(IO) { /* blocking code here */ } and runBlocking { ... }.

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

Kotlin can provide an explicit annotation (like @DslMarker) to avoid invocation from coroutine.

So we can rename the function to

@Blocking
fun runAndWait( ... )

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

Using the code:

val job = ...
run(IO + job) { ... }
job.cancel()

I/O thread should be interrupted?

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

We should not do interruption by default. It is dangerous. But we definitely need some easy way to adapt an interruptible blocking API (when we are sure it plays well with interrupts) to coroutines. I don't know yet what is going to be the best solution for this. This is also related to #57

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

@fvasco W.r.t to @Blocking annotation it is actually a larger issue that spans both Kotlin and Java. Here is a related issue in YT: https://youtrack.jetbrains.com/issue/KT-15525

from kotlinx.coroutines.

voddan avatar voddan commented on September 23, 2024

IMHO the name is misleading. When I see run(IO){} I think it is a coroutine context that guaranties that it runs on the main thread, even if started somewhere else. Or is it an impossible case?

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

Hh... the assumption was that the code inside run(IO) { ... } runs inside special IO threads that are designed for blocking IO. The naming problem is how we highlight that it is designed for blocking IO only and that you should not go there if you have async IO....

from kotlinx.coroutines.

LouisCAD avatar LouisCAD commented on September 23, 2024

@elizarov Why not use blockingIO then, instead of just IO?

from kotlinx.coroutines.

mykola-dev avatar mykola-dev commented on September 23, 2024

because it's not blocking? and rxjava already has Schedulers.io() which does the same thing

from kotlinx.coroutines.

oshai avatar oshai commented on September 23, 2024

I would call it IOPool (similar to common pool) with the ability to give it number of concurrent operations as parameter.

from kotlinx.coroutines.

voddan avatar voddan commented on September 23, 2024

BTW, why do we need to call it IO? Isn't it suitable for any blocking operations, IO or not? I feel that the main property of this dispatcher is that it can vary the number of its threads. Maybe something like BlockingHeap would be more fit?

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

A bit unrelated question.

How a coroutine integrates with locks?
Using IO for a blocking call leads to:

run(IO) { blockingQueue.take() }

this might looks reasonable, but putting this line in actors, in thousand of actors can easily drain the pool or, on other hand, it can create thousand of threads.

Currently I can't figure how integrate coroutine to synchronized block, locks, etc...
IO requests are a particular case of locking code: limit the request number can be a benefit and this one doesn't produce a deadlock, never.

Without a golden rule to execute any kind of blocking code I consider really helpful highlight the IO nature of this dispatcher.

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

On the naming: we don't have to have IO in the name (it is indeed for blocking ops), but we want it to be discoverable. The problem with "arbitrary blocking" is that in a large app you might want to have different pools for different kinds of blocking ops, but we need to provide some "least common denominator" out of the box, and the problem of blocking IO is what people are facing all the time, so IOPool might be a good name. I'm also thinking on "blocking" int the name, but as I've already alluded earlier run(BlockingPool) { ... } is too confusing w.r.t runBlocking { ... }. Might not be an issue if we rename run, though.

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

@fvasco Good point. You could use "IO" pool for integration with blocking queues etc. I mean, if you have to integrate with somebody else's BlockingQueue then you have no choice but to schedule the wait into some pool that is designed with blocking operations in mind.

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

Here is a fresh idea that was pointed to by Raul Raja at public Slack. It suffices to have a single CPU-optimized dispatcher is we also provide Scala-like blocking { ... } wrapper function that tags the block of code as doing "blocking" operation for the scheduler. Now, the scheduler will know how many of its treads are blocked and can create new thread as needed to continue to be able to pump CPU-consuming stuff. This seems to me a better programming model for doing blocking IO compared to the dedicated scheduler/dispatcher.

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

Please correct me if i am wrong,
the Raja's proposal is to create a unbonded thread executor for blocking operations?
In such case the coroutines remain cheap but blocking code in coroutines may become a fork bomb.

from kotlinx.coroutines.

oshai avatar oshai commented on September 23, 2024

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

I agree but it does not look so fresh,
it looks like the PR #83

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

@oshai You can always create thread-number limited context for your blocking ops. It is very good choice for people striving for best control. For example, if you are going JDBC and you want to limit your code to at most n concurrent DB operations, then you'll define val dbContext = newFixedThreadPoolContext(n, "DB") and use it for all you blocking DB operations, wrapping them into run(dbContext) { .... }.

The idea behind blocking { ... } is that is going to be a convenient for cases where you don't really care to put a limit, which is the case where the number of your concurrent blocking operations is limited though some other means, for example, by simply having a limited number of running coroutines that do blocking operations.

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

@fvasco The other difference is that this blocking { ... } does not have to be suspend. It can be defined with the following signature:

inline fun <T> blocking(block: () -> T): T

This way, you will be able to use it anywhere in your blocking code, regardless of whether you are going to run it from coroutine or not, and it will have the effect of signaling to the IO-aware coroutine scheduler when it is being use from coroutines.

However, the other implementation option is to declare it as suspend and then make its behavior depended on what kind of coroutine it is invoked from. It can simply serve as signal when it is invoked from an IO-aware scheduler or do switch (like run does) into a default IO-aware scheduler otherwise (if you use it from a UI-bound coroutine, for example).

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

Hi @elizarov,
build an unbounded thread pool is trivial in Java, also check the thread ownership.

Both changes are tiny patch over the #83, maybe your proposal is more complex, so I am patient for a code review.

from kotlinx.coroutines.

voddan avatar voddan commented on September 23, 2024

Did I get it correctly that if I do boo(); blocking { foo() } inside a coroutine, then foo() may run well after boo() and in another thread, depending on the scheduler?

from kotlinx.coroutines.

maxd avatar maxd commented on September 23, 2024

I want to clarify how the cancel will work in such case:

val job = launch { 
    blocking { 
        // this blocking IO code is hung or executing too long without ability to handle `isActive`
    } 
}

...

job.cancel()

Can the job.cancel() interrupt/abort execution of blocking code?

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

@maxd Unfortunately, most of the time blocking code on JVM cannot be readily interrupted/aborted (see, for example, all the blocking I/O apis on JVM, JDBC drivers, etc), so there is not way to provide such a facility out-of-the box. However, for those rare cases when there is some way to abort running blocking code, you can always write the corresponding aborting logic.

For example, if you are doing a blocking operation from a network socket there is a documented (guaranteed to work) way to abort it by closing the underlying socket, so one can write:

withContext(IO) {
    val job = coroutineContext[Job]!! // retrieve the current job from the coroutine context
    job.invokeOnCompletion {
        if (job.isCancelled) socket.close() // abort blocking operations on cancellation by closing socket    
    }
   socket.performSomeBlockingOperationHere()
}

We might provide an easier to use API for that, like coroutineContext.invokeOnCancellation { ... } extension function.

However, note that it is not a general solution in any way. For example, you cannot generally abort an going blocking JDBC operation in such a way, as trying to close JDBC connection while the operation is in progress usually blocks until the operation is complete.

from kotlinx.coroutines.

maxd avatar maxd commented on September 23, 2024

@elizarov I am agree with this opinion what sometimes some things doesn't provide ability to cancel long running tasks. However, in some cases, it is still need to cancel them forcibly.

For example I have a GUI application. The main priority here is user experience. The relability on the second place because, to be truth, even if I will call Thread#interrupt for long running jobs which just read a data, in most cases, all will be works fine. So, if I want to check connection and application show a progress dialog with cancel button I should have ability to abort this checking operation even it is hung (i.e. external library set default connection timeout to 60 seconds and I can't impact on that) because I loose nothing but it improve user experience (user realize that the connection is hung after 5 seconds and he don't want to wait yet 55 seconds).

I think will be great if Kotlin coroutines can helps to handle such rare cases but looks like the the blocking approach described in previous comments doesn't help to solve this problem. I mean that the Kotlin coroutines will not provide ability to interrupr/abort blocking calls. Right?

So, if it is true I see the following possible ways how to handle it by himself:

  1. don't use Kotlin coroutines for such cases at all
  2. wrap the long running task to the separate thread/feature (like in example above with socket) if you understand all consequences of it:
val result = async {
    val executor = Executors.newSingleThreadExecutor()
    val feature = executor.submit(Callable<Int> {
        longOperation()
    })

    val job = this.coroutineContext[Job]!!
    job.invokeOnCompletion(true) { // DEPRECATED: may be replaced to `invokeOnCancellation { ... }` in the future
        if(job.isCancelled) {
            feature.cancel(true)
        }
    }

    feature.get()
}

Am I right?

from kotlinx.coroutines.

maxd avatar maxd commented on September 23, 2024

First of all, I want to say thank you for these examples. They are very useful for me.

The workaround described above has a problem. CompletableFuture#cancel(true) doesn't interrupt execution of long running operation. I have found on the Internet what the one way to solve it is implement own "CompletableFuture" with ability to interrupt long running operation. So, I think I will use your workaround and custom "CompletableFuture" to solve my problem with cancellation of long running operations running from Kotlin coroutines.

Thank you.

from kotlinx.coroutines.

mkotlikov avatar mkotlikov commented on September 23, 2024

This was my solution, that way you can call it with async(CachedThreadPoolContext()){}. Thoughts?

fun CachedThreadPoolContext() = CachedThreadPoolDispatcher()
class CachedThreadPoolDispatcher : ExecutorCoroutineDispatcherBase() {
    companion object {
        private val executor = Executors.newCachedThreadPool()!!
    }
    override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
}

This was my original solution using CompletableFuture:

suspend fun <T> asyncResult(execute: () -> T): T = Async.asyncResult(execute)
object Async {
    private val ASYNC_RESULT_THREAD_POOL =  Executors.newCachedThreadPool()

    suspend fun <T> asyncResult(execute: () -> T): T {
        data class AsyncResponse<T>(val result: T?, val exception: Exception?)

        val responseChannel = Channel<AsyncResponse<T>>()
        val asyncResponse: AsyncResponse<T>?

        CompletableFuture.runAsync({})

        CompletableFuture.runAsync (
                Runnable {
                    try {
                        val executionResult = execute()
                        launch(NoopContinuation.context) {
                            responseChannel.send(AsyncResponse(
                                    result = executionResult,
                                    exception = null
                            ))
                        }
                    } catch (exception: Exception) {
                        launch(NoopContinuation.context) {
                            responseChannel.send(AsyncResponse(
                                    result = null,
                                    exception = exception
                            ))
                        }
                    }
                },
                ASYNC_RESULT_THREAD_POOL
        )

        asyncResponse = responseChannel.receive()

        if (asyncResponse.exception != null) {
            throw asyncResponse.exception
        }

        return asyncResponse.result!!
    }
}

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

UPDATE on the current design thinking around this issue:

  • DO introduce some kind of IO context so that one can write writeContext(IO) { ... } around blocking operation to ensure that limited-concurrency pool that executes coroutines is not blocked. However, it is going to be backed by an efficient implementation that does not perform any actual thread context switch when going from DefaultContext into IO context (see #261).

  • MAYBE implement some kind of blocking { ... } marker block as a complementary mechanism that can be used in non-suspending code that can get invoked on the coroutines thread pool and performs identically to withContext(IO) { ... } when used from a suspending function.

Open questions:

  • How do we name the context? Using IO is questionable, as it should be used for any blocking operations, not necessarily I/O related.

  • How do we name the maker block? Using blocking { ... } is questionable, as it is quite similar to runBlocking { ... } which would ensure lots of confusion and misunderstanding.

from kotlinx.coroutines.

LouisCAD avatar LouisCAD commented on September 23, 2024

BlockingOps as an alternative name to IO comes to my mind. It's 2 words, like CommonPool.

from kotlinx.coroutines.

GeoffreyMetais avatar GeoffreyMetais commented on September 23, 2024

What about Extended or Expandable? As its an expanding context.
this name enhances the fact that this context is not fixed and more costly than CommonPool.

from kotlinx.coroutines.

LassoMike avatar LassoMike commented on September 23, 2024

I like CachedThreadPoolContext because it sounds like the existing newSingleThreadContext and newFixedThreadPoolContext. Or CachedThreadPool because it sounds like CommonPool.

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

Using blocking { ... } is questionable, as it is quite similar to runBlocking { ... } which would ensure lots of confusion and misunderstanding

It is also possible to solve this debate renaming the runBlocking function.
As counterpart of async { } function we can rename runBlocking { } to sync { }, for example.

How do we name the maker block

I consider usefull a dedicated builder, expecialling to indicate the synchoronous/blocking nature of invoked functions.
The name should sound like the defined pool, for BlockingPool pool we can use blocking { }, for IO pool we use io { }, and so on...

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

Renaming runBlocking is on the table, too.

As for the naming of the context please keep in mind #261. All the contexts will be backed by the same shared pool, so I don't think it is appropriate to use Pool in their names. Moreover, I'm considering to deprecate CommonPool and recommend its replacement with DefaultContext when we are done with this.

Let me also link a related discussion on the naming of dispatchers: #41. I'm not convinced that we should isolate names of the dispatchers into a separate named, but I'm also not firmly convinced that they should be top-level.

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

Personally I consider the DefaultContext's name a good name, but a little messy.
Defining only DefaultContext induces to define an AlternativeContext: chaos over confusion.

A coroutine context appears more rich than a single coroutine dispatcher, so use Context induces me to consider it as a dispatcher plus something else...
What is wrong in the DefaultContext is the dispatcher only, we can name it CpuBoundDispatcher, giving a clear name makes the problem more evident: we cannot use the CpuBoundDispatcher for a non CPU bound task.

Instead, defining

val DefaultContext : CoroutineContext = CpuBoundDispatcher + (optionally something else)

leads me to consider

val BlockingContext = DefaultContext + NonCpuBoundDispatcher

Unfortunately I haven't solved this issue, I am sorry.
The above considerations are confusing my mind, but it is probably the reason why this report looks more like a beauty contest than a technical problem.

from kotlinx.coroutines.

elizarov avatar elizarov commented on September 23, 2024

Now when we merged experimental coroutines dispatcher we can deliver IO scheduler soon using it, even if DefaultDispatcher would be still pointing to CommonPool for some time.

from kotlinx.coroutines.

LouisCAD avatar LouisCAD commented on September 23, 2024

I'm a bit confused as to what to expect for the future of kotlinx.coroutines default/recommended dispatchers. Will we have to continue running I/O separate from CPU-bound tasks like it's currently done with CommonPool and custom dispatcher (often executor based), or will they be merged to the new experimental coroutines dispatcher so we no longer have to worry about I/O being done in the same dispatcher as the CPU bound code?

from kotlinx.coroutines.

fvasco avatar fvasco commented on September 23, 2024

Its default parallelism is equal to the number of cores.

Are you considered:

The current plan is to set defaultParallelism to nCPUs + 1 as a compromise value that ensures utilization of the underlying hardware even if one coroutine accidentally blocks and helps us avoid issue #198

and later in #261 ?

from kotlinx.coroutines.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.