Coder Social home page Coder Social logo

async-pool's People

Contributors

bts avatar expipiplus1 avatar f-f avatar jwiegley avatar l29ah avatar michaelxavier avatar phadej avatar saurabhnanda avatar worldsender avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

async-pool's Issues

Cancelling an async immediately after its creation causes exception in `runTaskGroup`

The runTaskGroup function can crash with a "Match Exception" error when an async task is created and cancelled before it is scheduled to run. This issue can lead to unexpected behavior, such as the subsequent async tasks in the task group not being executed.

Consider the following example:

import Control.Concurrent.Async as Async
import Control.Concurrent.Async.Pool as Pool
import Control.Exception

do
  pool <- Pool.createPool
  group <- Pool.createTaskGroup pool 2
  mainThread <- Async.async $ runTaskGroup group `catch` (\(e :: SomeException) -> print e)
  Pool.async group (print "x") >>= Pool.cancel
  Pool.async group (print "y")

In this code, the first async task is created and immediate cancelled before it has a chance to be scheduled. Then y will never be printed. The error message looks like this:

Match Exception, Node: 0
CallStack (from HasCallStack):
  error, called at ./Data/Graph/Inductive/Graph.hs:385:26 in fgl-5.8.2.0-AeC32udtCeIHgQIC5kCLt:Data.Graph.Inductive.Graph

The following code consistently reproduce this issue:

do
  pool <- Pool.createPool
  group <- Pool.createTaskGroup pool 2
  x <- Pool.async group $ print "x"
  Pool.cancel x
  Pool.runTaskGroup group

`runConcurrently` hangs whenever a number of tasks is greater than the pool capacity

Suppose, I have the following piece of code

import Control.Concurrent.Async.Pool (Concurrently(..), withTaskGroup)

main :: IO ()
main = do
  let xs = [1, 2, 3, 5, 6] :: [Int]
  s <- withTaskGroup 4
        (runConcurrently (sum <$> traverse (Concurrently . const . pure) xs))
  print s

It hangs whenever the pool size is lesser than the number of tasks. E.g. the code above hangs, but if we change pool size from 4 to 5, it doesn't.

Nested mapTasks invocations can cause deadlock

Pinging @spl

If mapTasks invokes mapTasks, deadlock can occur. Scenario:

  • mapTasks, which enqueues all of its jobs immediately in the execution graph, starts executing the first N tasks. It also waits until all tasks are finished before returning to the caller.
  • If one of those tasks calls mapTasks, it will enqueue M more tasks, and wait for them to complete as well. However, the inner mapTasks can starve because no execution slots may ever free up: all jobs are now blocked, waiting on future jobs, which are waiting on the blocked jobs to finish.

Solution: If we notice a call to wait in a thread whose threadId matches that of a running job, raise an exception that there may be a deadlock scenario.

We could either raise this exception always (which makes it easier for the developer to avoid this situation), or we could only raise it if, when wait is called, there are no available slots.

Document example usage of `mapReduce` (and any similar functions).

It was not immediately obvious to me how to construct a correct invocation of mapReduce. Its signature shows it returning an STM (Async a), so it seemed not too far-fetched to try to wait for it in the same STM transaction that created it:

withTaskGroup 8 \tg -> atomically (mapReduce tg tasklist >>= waitSTM)

but that fails with the dreaded "thread blocked indefinitely" error. It turns out (after finding an example in one of the tests) that the correct invocation is rather:

withTaskGroup 8 \tg -> atomically (mapReduce tg tasklist) >>= wait

With the waitSTM (via wait) in a separate atomic transaction. Some text showing correct usage may also be helpful to others, perhaps with an explanation motivating the need for the separation (but a simple dictate may suffice).

State of package?

Since this package shares so much in common with async I'd appreciate a README that helps me (and others) place this package in the grand scheme of the Haskell ecosystem. async is actively maintained but async-pool has had very little action in years.

Care to elaborate?

Consume input lazily OR allow "querying" of TaskGroup

While this library helps in ensuring that only a limited/pre-defined number of actions are evaluated in parallel, it still has one problem (especially with very large input data-sets). If the input data-set has N=2,000,000, this is going to create 2,000,000 asyncs, although 99% of them might not be getting concurrently evaluated. This still results in linear memory growth.

Even the most "lazy" function I could find, i.e. scatterFoldMapM, is only lazy wrt the output (i.e. it doesn't try to collect ALL the output). However, if I'm not mistaken, even this function will create all async immediately, even if it is not possible to run them concurrently.

Therefore, the title of this issue. I believe this can be handled in two possible ways:

  • Having a new function with the following type signature, which consumes the input lazily (is this another continuation? I'm not sure!):

    someFunc :: (MonadIO m, Monoid b) 
              => TaskGroup 
              -> m (IO a)                          -- ^ producer of monadic actions
              -> (Either SomeException a -> m b)   -- ^ consumer of results
              -> m b
    
  • Allowing one to query the TaskGroup to see how many slots are vacant. This allows one to write complex scheduling logic for when to push a task.

    vacantSlots :: TaskGroup -> Int
    

mapTasks_ does not work

Sorry about the vague title. I haven't looked into the cause, but I thought it was significant enough to report.

I have the following code:

liftIO $ withTaskGroup 50 $ \tg -> mapTasks_ tg tasks

This was the first time I tried async-pool, so when I ran it, I was quite surprised that the tasks did not run. I swapped in mapTasks for mapTasks_:

liftIO $ withTaskGroup 50 $ \tg -> void $ mapTasks tg tasks

And the tasks ran just fine.

Since the problem seems to be pretty clearly due to mapTasks_, I hope it's an easy fix for you. Otherwise, if you need a test case or something, let me know.

Build failure with up-to-date dependencies on GHC 7.6.3

I've run into the failure below; I think the culprit may be the monad-control stack. I also noticed there are no bounds on dependencies: I would (personally) recommend maintaining strict lower and upper-bounds on all dependencies since async-pool depends on a few non-trivial packages.

➜  ~  ci async-pool
Resolving dependencies...
Downloading fgl-5.5.1.0...
Configuring fgl-5.5.1.0...
Building fgl-5.5.1.0...
Installed fgl-5.5.1.0
Downloading async-pool-0.8.0...
Configuring async-pool-0.8.0...
Building async-pool-0.8.0...
Failed to install async-pool-0.8.0
Build log ( /Users/ozataman/.cabal/logs/async-pool-0.8.0.log ):
Configuring async-pool-0.8.0...
Building async-pool-0.8.0...
Preprocessing library async-pool-0.8.0...
[1 of 3] Compiling Control.Concurrent.Async.Pool.Async ( Control/Concurrent/Async/Pool/Async.hs, dist/build/Control/Concurrent/Async/Pool/Async.o )
[2 of 3] Compiling Control.Concurrent.Async.Pool.Internal ( Control/Concurrent/Async/Pool/Internal.hs, dist/build/Control/Concurrent/Async/Pool/Internal.o )

Control/Concurrent/Async/Pool/Internal.hs:256:33:
    Could not deduce (StM m b ~ StM m a0)
    from the context (Foldable t, Monoid b, MonadBaseControl IO m)
      bound by the type signature for
                 scatterFoldMapM :: (Foldable t, Monoid b, MonadBaseControl IO m) =>
                                    TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b
      at Control/Concurrent/Async/Pool/Internal.hs:(250,20)-(251,82)
    NB: `StM' is a type function, and may not be injective
    The type variable `a0' is ambiguous
    Possible fix: add a type signature that fixes these type variable(s)
    Expected type: IO (StM m b)
      Actual type: IO (StM m a0)
    In the second argument of `loop', namely `(run $ return mempty)'
    In the expression: loop run (run $ return mempty) (toList hs)
    In the second argument of `($)', namely
      `\ run -> loop run (run $ return mempty) (toList hs)'
Updating documentation index
/Users/ozataman/.cabal/share/doc/x86_64-osx-ghc-7.6.3/index.html
cabal: Error: some packages failed to install:
async-pool-0.8.0 failed during the building phase. The exception was:
ExitFailure 1

mapConcurrently is not reliable

I wrote a little wrapper:

mapConcurrentlyBounded n f args = AsyncPool.withTaskGroup n $ \tg -> do
  AsyncPool.mapConcurrently tg f args

but it doesn't seem to work as expected

> mapConcurrentlyBounded 10 (\x -> threadDelay (x*100000) >> print x) [1..100]

This fails with an STM error.

mapConcurrentlyBounded :: Traversable t => Int -> (a1 -> IO a) -> t a1 -> IO (t a)
mapConcurrentlyBounded n f args = AsyncPool.withTaskGroup n $ \tg -> do
  AsyncPool.mapTasks tg (fmap f args)

works fine.

Expose `cancelAll`

I am experimenting with this package and cannot use withTaskGroup for my setting (because I'm in a custom monad). I can reimplement most of it from the outside, except that the cancelAll function isn't exposed. This seems like it is a useful operation on its own, and maybe it's just an oversight that it isn't already in the export list.

mapTasks threw fgl exception on completion

After switching from mapTasks_ to mapTasks (see #2), I ran a long database migration (3.44 hours). At the end, I found that the code threw an exception: "Match Exception, Node: 74575". This appears to be due to fgl in Data.Graph.Inductive.Graph.context, and fgl is in my cabal sandbox. Since async-pool depends on fgl, logically, the exception is probably coming via async-pool.

The code in mapTasks did complete, but the code after mapTasks did not complete. So, my guess is that it is part of the clean-up of mapTasks.

Looking at cleanupTask, I see two uses of fgl functions (suc and pre), and each of them has a reference to context, so those may be suspect.

The structure of my code is this:

run = do
  ...
  withTaskGroup 50 $ \tg -> void $ mapTasks tg $
    map f xs ++
    map g ys ++
    map (h tg . i) xs
  ...

h tg x = do
  ...
  mapTasks_ tg $ map j $ filter k zs
  ...

And, now that I actually look at my code again, I see mapTasks_ pop up unexpectedly. Looks like I forgot to change this one to mapTasks. I'm going to run the map (h tg . i) xs part again with mapTasks in h to see if I still get the exception.

makeDependent on already started tasks

I find makeDependent a little bit confusing: what happens if it is called in order to indicate that a particular task has dependencies when that task has already been started? For example

main :: IO ()
main = do
    p <- createPool
    withTaskGroupIn p 2 $ \g -> do
        t1 <- async g task1
        t2 <- async g task2
        threadDelay 1000 -- wait 1ms for task2 to start
        atomically $
            makeDependent p (taskHandle t2) (taskHandle t1)
        wait t1
        wait t2

for some task1, task2 :: IO ().

fails with ghc-9.4 for Stackage Nightly

Building library for async-pool-0.9.1..                                                                                  [30/182284]
[1 of 3] Compiling Control.Concurrent.Async.Pool.Async                                                                              

/var/stackage/work/unpack-dir/unpacked/async-pool-0.9.1-525b46e6a39c80b7461e8cd146d540c27d47abc717b746ff34c21aa9b1c47743/Control/Con
current/Async/Pool/Async.hs:714:16: error:
    • Couldn't match expected type: State# RealWorld                                                                                
                                    -> (# State# RealWorld, a1 #)                                                                   
                  with actual type: IO ()                         
    • In the first argument of ‘fork#’, namely ‘action’                                                                             
      In the expression: fork# action s                           
      In the expression:                                          
        case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)                                                            
    |                                                             
714 |    case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)                                                           
    |                ^^^^^^                                       

/var/stackage/work/unpack-dir/unpacked/async-pool-0.9.1-525b46e6a39c80b7461e8cd146d540c27d47abc717b746ff34c21aa9b1c47743/Control/Con
current/Async/Pool/Async.hs:719:22: error:
    • Couldn't match expected type: State# RealWorld                                                                                
                                    -> (# State# RealWorld, a0 #)                                                                   
                  with actual type: IO ()                         
    • In the second argument of ‘forkOn#’, namely ‘action’                                                                          
      In the expression: forkOn# cpu action s                                                                                       
      In the expression:                                          
        case (forkOn# cpu action s) of                            
          (# s1, tid #) -> (# s1, ThreadId tid #)                                                                                   
    |                                                             
719 |    case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)                                                     
    |                      ^^^^^^              

Build failure with GHC 9.4 (please make a metadata revision)

This package fails to build with GHC 9.4.

Building library for async-pool-0.9.1..
[1 of 3] Compiling Control.Concurrent.Async.Pool.Async ( Control/Concurrent/Async/Pool/Async.hs, dist/build/Control/Concurrent/Async/Pool/Async.o, dist/build/Control/Concurrent/Async/Pool/Async.dyn_o )

Control/Concurrent/Async/Pool/Async.hs:714:16: error:
    • Couldn't match expected type: State# RealWorld
                                    -> (# State# RealWorld, a1 #)
                  with actual type: IO ()
    • In the first argument of ‘fork#’, namely ‘action’
      In the expression: fork# action s
      In the expression:
        case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
    |
714 |    case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
    |                ^^^^^^

Control/Concurrent/Async/Pool/Async.hs:719:22: error:
    • Couldn't match expected type: State# RealWorld
                                    -> (# State# RealWorld, a0 #)
                  with actual type: IO ()
    • In the second argument of ‘forkOn#’, namely ‘action’
      In the expression: forkOn# cpu action s
      In the expression:
        case (forkOn# cpu action s) of
          (# s1, tid #) -> (# s1, ThreadId tid #)
    |
719 |    case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
    |                      ^^^^^^

Please make a metadata revision on async-pool-9.0.1 to correct the upper bound on base from < 5 to < 4.17.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.