jwiegley / async-pool Goto Github PK
View Code? Open in Web Editor NEWLicense: Other
License: Other
The runTaskGroup
function can crash with a "Match Exception" error when an async task is created and cancelled before it is scheduled to run. This issue can lead to unexpected behavior, such as the subsequent async tasks in the task group not being executed.
Consider the following example:
import Control.Concurrent.Async as Async
import Control.Concurrent.Async.Pool as Pool
import Control.Exception
do
pool <- Pool.createPool
group <- Pool.createTaskGroup pool 2
mainThread <- Async.async $ runTaskGroup group `catch` (\(e :: SomeException) -> print e)
Pool.async group (print "x") >>= Pool.cancel
Pool.async group (print "y")
In this code, the first async task is created and immediate cancelled before it has a chance to be scheduled. Then y
will never be printed. The error message looks like this:
Match Exception, Node: 0
CallStack (from HasCallStack):
error, called at ./Data/Graph/Inductive/Graph.hs:385:26 in fgl-5.8.2.0-AeC32udtCeIHgQIC5kCLt:Data.Graph.Inductive.Graph
The following code consistently reproduce this issue:
do
pool <- Pool.createPool
group <- Pool.createTaskGroup pool 2
x <- Pool.async group $ print "x"
Pool.cancel x
Pool.runTaskGroup group
Suppose, I have the following piece of code
import Control.Concurrent.Async.Pool (Concurrently(..), withTaskGroup)
main :: IO ()
main = do
let xs = [1, 2, 3, 5, 6] :: [Int]
s <- withTaskGroup 4
(runConcurrently (sum <$> traverse (Concurrently . const . pure) xs))
print s
It hangs whenever the pool size is lesser than the number of tasks. E.g. the code above hangs, but if we change pool size from 4 to 5, it doesn't.
Pinging @spl
If mapTasks
invokes mapTasks
, deadlock can occur. Scenario:
mapTasks
, which enqueues all of its jobs immediately in the execution graph, starts executing the first N tasks. It also waits until all tasks are finished before returning to the caller.mapTasks
, it will enqueue M more tasks, and wait for them to complete as well. However, the inner mapTasks
can starve because no execution slots may ever free up: all jobs are now blocked, waiting on future jobs, which are waiting on the blocked jobs to finish.Solution: If we notice a call to wait
in a thread whose threadId
matches that of a running job, raise an exception that there may be a deadlock scenario.
We could either raise this exception always (which makes it easier for the developer to avoid this situation), or we could only raise it if, when wait
is called, there are no available slots.
Does this library have a "lifted" version similar to lifted-async
or UnliftIO.async
?
Version 0.9.0.2 on Hackage requires base < 4.12, which leads to that this package is broken on Nix: https://hydra.nixos.org/build/86369536/nixlog/1
Could you release a newer version sometime?
It was not immediately obvious to me how to construct a correct invocation of mapReduce
. Its signature shows it returning an STM (Async a)
, so it seemed not too far-fetched to try to wait for it in the same STM transaction that created it:
withTaskGroup 8 \tg -> atomically (mapReduce tg tasklist >>= waitSTM)
but that fails with the dreaded "thread blocked indefinitely" error. It turns out (after finding an example in one of the tests) that the correct invocation is rather:
withTaskGroup 8 \tg -> atomically (mapReduce tg tasklist) >>= wait
With the waitSTM
(via wait
) in a separate atomic transaction. Some text showing correct usage may also be helpful to others, perhaps with an explanation motivating the need for the separation (but a simple dictate may suffice).
Since this package shares so much in common with async
I'd appreciate a README that helps me (and others) place this package in the grand scheme of the Haskell ecosystem. async
is actively maintained but async-pool
has had very little action in years.
Care to elaborate?
While this library helps in ensuring that only a limited/pre-defined number of actions are evaluated in parallel, it still has one problem (especially with very large input data-sets). If the input data-set has N=2,000,000, this is going to create 2,000,000 asyncs, although 99% of them might not be getting concurrently evaluated. This still results in linear memory growth.
Even the most "lazy" function I could find, i.e. scatterFoldMapM
, is only lazy wrt the output (i.e. it doesn't try to collect ALL the output). However, if I'm not mistaken, even this function will create all async immediately, even if it is not possible to run them concurrently.
Therefore, the title of this issue. I believe this can be handled in two possible ways:
Having a new function with the following type signature, which consumes the input lazily (is this another continuation? I'm not sure!):
someFunc :: (MonadIO m, Monoid b)
=> TaskGroup
-> m (IO a) -- ^ producer of monadic actions
-> (Either SomeException a -> m b) -- ^ consumer of results
-> m b
Allowing one to query the TaskGroup to see how many slots are vacant. This allows one to write complex scheduling logic for when to push a task.
vacantSlots :: TaskGroup -> Int
Sorry about the vague title. I haven't looked into the cause, but I thought it was significant enough to report.
I have the following code:
liftIO $ withTaskGroup 50 $ \tg -> mapTasks_ tg tasks
This was the first time I tried async-pool
, so when I ran it, I was quite surprised that the tasks
did not run. I swapped in mapTasks
for mapTasks_
:
liftIO $ withTaskGroup 50 $ \tg -> void $ mapTasks tg tasks
And the tasks
ran just fine.
Since the problem seems to be pretty clearly due to mapTasks_
, I hope it's an easy fix for you. Otherwise, if you need a test case or something, let me know.
I've run into the failure below; I think the culprit may be the monad-control stack. I also noticed there are no bounds on dependencies: I would (personally) recommend maintaining strict lower and upper-bounds on all dependencies since async-pool depends on a few non-trivial packages.
➜ ~ ci async-pool
Resolving dependencies...
Downloading fgl-5.5.1.0...
Configuring fgl-5.5.1.0...
Building fgl-5.5.1.0...
Installed fgl-5.5.1.0
Downloading async-pool-0.8.0...
Configuring async-pool-0.8.0...
Building async-pool-0.8.0...
Failed to install async-pool-0.8.0
Build log ( /Users/ozataman/.cabal/logs/async-pool-0.8.0.log ):
Configuring async-pool-0.8.0...
Building async-pool-0.8.0...
Preprocessing library async-pool-0.8.0...
[1 of 3] Compiling Control.Concurrent.Async.Pool.Async ( Control/Concurrent/Async/Pool/Async.hs, dist/build/Control/Concurrent/Async/Pool/Async.o )
[2 of 3] Compiling Control.Concurrent.Async.Pool.Internal ( Control/Concurrent/Async/Pool/Internal.hs, dist/build/Control/Concurrent/Async/Pool/Internal.o )
Control/Concurrent/Async/Pool/Internal.hs:256:33:
Could not deduce (StM m b ~ StM m a0)
from the context (Foldable t, Monoid b, MonadBaseControl IO m)
bound by the type signature for
scatterFoldMapM :: (Foldable t, Monoid b, MonadBaseControl IO m) =>
TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b
at Control/Concurrent/Async/Pool/Internal.hs:(250,20)-(251,82)
NB: `StM' is a type function, and may not be injective
The type variable `a0' is ambiguous
Possible fix: add a type signature that fixes these type variable(s)
Expected type: IO (StM m b)
Actual type: IO (StM m a0)
In the second argument of `loop', namely `(run $ return mempty)'
In the expression: loop run (run $ return mempty) (toList hs)
In the second argument of `($)', namely
`\ run -> loop run (run $ return mempty) (toList hs)'
Updating documentation index
/Users/ozataman/.cabal/share/doc/x86_64-osx-ghc-7.6.3/index.html
cabal: Error: some packages failed to install:
async-pool-0.8.0 failed during the building phase. The exception was:
ExitFailure 1
I wrote a little wrapper:
mapConcurrentlyBounded n f args = AsyncPool.withTaskGroup n $ \tg -> do
AsyncPool.mapConcurrently tg f args
but it doesn't seem to work as expected
> mapConcurrentlyBounded 10 (\x -> threadDelay (x*100000) >> print x) [1..100]
This fails with an STM error.
mapConcurrentlyBounded :: Traversable t => Int -> (a1 -> IO a) -> t a1 -> IO (t a)
mapConcurrentlyBounded n f args = AsyncPool.withTaskGroup n $ \tg -> do
AsyncPool.mapTasks tg (fmap f args)
works fine.
I am experimenting with this package and cannot use withTaskGroup
for my setting (because I'm in a custom monad). I can reimplement most of it from the outside, except that the cancelAll
function isn't exposed. This seems like it is a useful operation on its own, and maybe it's just an oversight that it isn't already in the export list.
After switching from mapTasks_
to mapTasks
(see #2), I ran a long database migration (3.44 hours). At the end, I found that the code threw an exception: "Match Exception, Node: 74575"
. This appears to be due to fgl
in Data.Graph.Inductive.Graph.context
, and fgl
is in my cabal sandbox. Since async-pool
depends on fgl
, logically, the exception is probably coming via async-pool
.
The code in mapTasks
did complete, but the code after mapTasks
did not complete. So, my guess is that it is part of the clean-up of mapTasks
.
Looking at cleanupTask
, I see two uses of fgl
functions (suc
and pre
), and each of them has a reference to context
, so those may be suspect.
The structure of my code is this:
run = do
...
withTaskGroup 50 $ \tg -> void $ mapTasks tg $
map f xs ++
map g ys ++
map (h tg . i) xs
...
h tg x = do
...
mapTasks_ tg $ map j $ filter k zs
...
And, now that I actually look at my code again, I see mapTasks_
pop up unexpectedly. Looks like I forgot to change this one to mapTasks
. I'm going to run the map (h tg . i) xs
part again with mapTasks
in h
to see if I still get the exception.
I find makeDependent
a little bit confusing: what happens if it is called in order to indicate that a particular task has dependencies when that task has already been started? For example
main :: IO ()
main = do
p <- createPool
withTaskGroupIn p 2 $ \g -> do
t1 <- async g task1
t2 <- async g task2
threadDelay 1000 -- wait 1ms for task2 to start
atomically $
makeDependent p (taskHandle t2) (taskHandle t1)
wait t1
wait t2
for some task1, task2 :: IO ()
.
Building library for async-pool-0.9.1.. [30/182284]
[1 of 3] Compiling Control.Concurrent.Async.Pool.Async
/var/stackage/work/unpack-dir/unpacked/async-pool-0.9.1-525b46e6a39c80b7461e8cd146d540c27d47abc717b746ff34c21aa9b1c47743/Control/Con
current/Async/Pool/Async.hs:714:16: error:
• Couldn't match expected type: State# RealWorld
-> (# State# RealWorld, a1 #)
with actual type: IO ()
• In the first argument of ‘fork#’, namely ‘action’
In the expression: fork# action s
In the expression:
case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
|
714 | case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
| ^^^^^^
/var/stackage/work/unpack-dir/unpacked/async-pool-0.9.1-525b46e6a39c80b7461e8cd146d540c27d47abc717b746ff34c21aa9b1c47743/Control/Con
current/Async/Pool/Async.hs:719:22: error:
• Couldn't match expected type: State# RealWorld
-> (# State# RealWorld, a0 #)
with actual type: IO ()
• In the second argument of ‘forkOn#’, namely ‘action’
In the expression: forkOn# cpu action s
In the expression:
case (forkOn# cpu action s) of
(# s1, tid #) -> (# s1, ThreadId tid #)
|
719 | case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
| ^^^^^^
It has been quite some time since 0.9.0.2. Multiple commits since, perhaps time for a new release?
But I would also like to see this package in stackage (https://github.com/commercialhaskell/stackage/blob/master/README.md).
This package fails to build with GHC 9.4.
Building library for async-pool-0.9.1..
[1 of 3] Compiling Control.Concurrent.Async.Pool.Async ( Control/Concurrent/Async/Pool/Async.hs, dist/build/Control/Concurrent/Async/Pool/Async.o, dist/build/Control/Concurrent/Async/Pool/Async.dyn_o )
Control/Concurrent/Async/Pool/Async.hs:714:16: error:
• Couldn't match expected type: State# RealWorld
-> (# State# RealWorld, a1 #)
with actual type: IO ()
• In the first argument of ‘fork#’, namely ‘action’
In the expression: fork# action s
In the expression:
case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
|
714 | case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
| ^^^^^^
Control/Concurrent/Async/Pool/Async.hs:719:22: error:
• Couldn't match expected type: State# RealWorld
-> (# State# RealWorld, a0 #)
with actual type: IO ()
• In the second argument of ‘forkOn#’, namely ‘action’
In the expression: forkOn# cpu action s
In the expression:
case (forkOn# cpu action s) of
(# s1, tid #) -> (# s1, ThreadId tid #)
|
719 | case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
| ^^^^^^
Please make a metadata revision on async-pool-9.0.1
to correct the upper bound on base
from < 5
to < 4.17
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.