Coder Social home page Coder Social logo

ki's Introduction

ki ki-unlifted
GitHub CI
Hackage Hackage
Stackage LTS Stackage LTS
Stackage Nightly Stackage Nightly
Dependencies Dependencies

Overview

ki is a lightweight structured-concurrency library inspired by many other projects and blog posts:

A previous version of ki also included a mechanism for soft-cancellation/graceful shutdown, which took inspiration from:

However, this feature was removed (perhaps temporarily) because the design of the API was unsatisfactory.

Documentation

Hackage documentation

Example: Happy Eyeballs

The Happy Eyeballs algorithm is a particularly common example used to demonstrate the advantages of structured concurrency, because it is simple to describe, but can be surprisingly difficult to implement.

The problem can be abstractly described as follows: we have a small set of actions to run, each of which can take arbitrarily long, or fail. Each action is a different way of computing the same value, so we only need to wait for one action to return successfully. We don't want to run the actions one at a time (because that is likely to take too long), nor all at once (because that is an improper use of resources). Rather, we will begin executing the first action, then wait 250 milliseconds, then begin executing the second, and so on, until one returns successfully.

There are of course a number of ways to implement this algorithm. We'll do something non-optimal, but simple. Let's get the imports out of the way first.

import Control.Concurrent
import Control.Monad (when)
import Control.Monad.STM (atomically)
import Data.Function ((&))
import Data.Functor (void)
import Data.List qualified as List
import Data.Maybe (isJust)
import Ki qualified

Next, let's define a staggeredSpawner helper that implements the majority of the core algorithm: given a list of actions, spawn them all at 250 millisecond intervals. After all actions are spawned, we block until all of them have returned.

staggeredSpawner :: [IO ()] -> IO ()
staggeredSpawner actions = do
  Ki.scoped \scope -> do
    actions
      & map (\action -> void (Ki.fork scope action))
      & List.intersperse (threadDelay 250_000)
      & sequence_
    atomically (Ki.awaitAll scope)

And finally, we wrap this helper with happyEyeballs, which accepts a list of actions, and returns when one action returns successfully, or returns Nothing if all actions fail. Note that in a real implementation, we may want to consider what to do if an action throws an exception. Here, we trust each action to signal failure by returning Nothing.

happyEyeballs :: [IO (Maybe a)] -> IO (Maybe a)
happyEyeballs actions = do
  resultVar <- newEmptyMVar

  let worker action = do
        result <- action
        when (isJust result) do
          _ <- tryPutMVar resultVar result
          pure ()

  Ki.scoped \scope -> do
    _ <-
      Ki.fork scope do
        staggeredSpawner (map worker actions)
        tryPutMVar resultVar Nothing
    takeMVar resultVar

ki's People

Contributors

mitchellwrosen avatar tristancacqueray avatar tstat avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ki's Issues

`awaitAll` shouldn't block forever if scoped is closing/closed

Minor oversight: currently awaitAll will just block forever if called on a closing or closed scope.

If the scope is closed, we should just throw an exception.

If the scope is closing, then the user is doing something weird and wrong:

  • We know it's not the parent that's awaitAll-ing, since the parent is closing the scope.
  • It shouldn't be a child - calling awaitAll really never makes sense to call from a child thread on its own scope, since it (semantically) would just block forever. So, I think we should probably just block here, waiting for the TVar to become 0, which it won't, because we'll soon get hit by a ScopeClosing.
  • The third possibility, I suppose, is that the user has smuggled this Scope away to some non-child thread that's calling awaitAll. I kind of think it's ok to again just block forever here, waiting for the TVar to become 0, which it won't. If we eventually get hit by a BlockedIndefinitelyOnSTM, then ok.

Reconsider the behavior of trying to fork a new thread in a closing scope

Some background: conceptually, there are three states a scope can be in:

  • "open", allowing new threads to be created within it
  • "closing", because we reached the end of a scoped block naturally, or because we got hit by an exception, and are going to proceed to killing all living children and waiting for them to terminate
  • "closed", because we're outside the callback in which the scope was valid, like any regular resource acquired in bracket-style

Clearly, we do want to disallow this bogus program, either with the type system (meh) or via a runtime exception:

scope <- Ki.scoped pure
Ki.fork scope whatever -- using a scope outside its callback

On to the implementation. Each scope keeps an int count of the threads that are about to start, with the sentinel value -1 meaning closed/closing. When we go to fork a thread, if this counter is not -1, we bump the counter, then spawn the thread, then decrement the counter. If the counter is -1, we throw a runtime exception (error "ki: scope closed").

This design makes closing a scope pretty simple: wait until there are 0 children about to start, then prevent new children from starting by writing -1, then kill all of the living children.

The problem (potentially) is that there's not actually a "closing" state that's distinguishable from "closed". So while we do prevent bogus programs like the above from spawning a thread in a closed scope, it seems wrong to punish code that attempts to spawn a thread into a closing scope in the same way.

Some options:

  1. (straw man) Make fork have type fork :: Scope -> IO a -> IO (Maybe (Thread a)), and return Nothing if we try to fork a thread in a closing scope. I don't think this API is good, but it's conceptually what we are after. The current behavior (to reiterate/summarize the above) is to throw a lazy runtime exception with error "ki: scope closing" rather than return Nothing.
  2. Make Thread a two-variant sum type, with a DidntActuallyMakeTheThreadBecauseTheScopeWasClosing variant. We'll have to decide what to do if you await such a thing.
  3. Tweak the teardown dance to actually continue to allow threads to be created in a closing scope, if only to throw a ScopeClosing exception to them soon after (which is how we kill children). This doesn't seem meaningfully different to (2).
  4. Something else, or nothing?

How to define a daemon thread of type IO Void?

I'm sure I'm missing something obvious but for a thread that "never returns" fork_ needs an IO Void.

How do you make an IO Void? I mean I know it could be undefined which is of type a but that seems suboptimal!

The only example I could find is in your test suite and you're using throwIO (which is also of type a, of course). Is that what you had in mind, that every daemon thread that "never returns" would end with a throw?

Consider removing `Thread`

Currently, the thread API is just:

fork :: Scope -> IO a -> IO (Thread a)
await :: Thread a -> IO a
awaitSTM :: Thread a -> STM a
awaitFor :: Thread a -> Duration -> IO (Maybe a)

Since you can't cancel a single thread (should you be able to...?), we could instead just represent a thread as the STM a that awaits its return value.

fork :: Scope -> IO a -> IO (STM a)

await and awaitFor could still be exported, albeit with kind of weird definitions that don't exactly relate to any concepts in ki specifically.

await :: STM a -> IO a
await = atomically

awaitFor :: STM a -> Duration -> IO (Maybe a)
awaitFor x s = timeoutSTM s (pure . Just <$> x) (pure Nothing)

The benefit here would be removing one noun from the interface. Not exactly sure if that's desirable in this case :)

How to handle threads that shouldn't be killed

We have been (ab-)using ki in the following way:

  • We have a medium length job that runs
  • During the execution of the job, we have some tasks that need to be completed but not promptly (e.g. updating the database)
  • To handle this, we pass around a Scope which lives for the lifetime of the job and into which we fork background tasks

This isn't very tidy:

  • It's unclear what the lifetime of the scope is
  • You had better call awaitAll on the scope before you terminate, otherwise you might kill things that need to finish

What we're doing seems not ideal, but I'm not sure how to do it better.

Ideas:

  • Track it on the thread level: some threads are okay to kill, some aren't. A Scope could track which ones can be killed and which ones have to be awaited.
  • Track it on the scope level: some scopes kill their children, some await them. We could have different scoped functions that create Scopes that behave differently. Maybe these should be different types of Scope.
  • This is just not an appropriate use of structured concurrency, and you should have something more like an explicit job queue.

Unify `fork` and `async`

Current API

data Thread a
fork :: Scope -> IO a -> IO (Thread a)
async :: Scope -> IO a -> IO (Thread (Either SomeException a))
await :: Thread a -> IO a

Idea 1: allow user to pick e in async

data Thread a
fork :: Scope -> IO a -> IO (Thread a)
async :: Exception e => Scope -> IO a -> IO (Thread (Either e a))
await :: Thread a -> IO a

This would allow users to pick some root exception, not necessarily SomeException, that is "checked" - and all other higher exceptions (including all async exceptions, always) are propagated to the parent, like fork.

One downside here is it moves async in the direction of fork, and in fact async @Void would be basically equivalent to fork, so it might be a bit difficult to separate the two functions in one's mind.

Idea 2: move the e into Thread

data Thread e a
fork :: Scope -> IO a -> IO (Thread Void a)
async :: Exception e => Scope -> IO a -> IO (Thread e a)
await :: Thread e a -> IO (Either e a)

This would move the concept of what exception a thread might be expected to throw into its type, which is kind of nice. For example, a Thread IOException Int is a thread that may succeed with an Int, or fail with an IOException, but any other exception is truly unexpected, not evident in its type, and would be propagated to its parent.

One downside here is callers of await on a Thread Void have to deal with an absurd Left Void case explicitly. This can be worked around with a type family, which adds complexity to the API, and requires users to apply a function in their heads when reading documentation:

type family Result e a where
  Result Void a = a
  Result e a = Either e a

await :: Thread e a -> IO (Result e a)

Idea 3: fully unify fork and async

data Thread e a

fork :: SyncException e => Scope -> IO a -> IO (Thread e a)
await :: Thread e a -> IO (Result e a)

class Exception e => SyncException e where
  type Result e a :: Type
  notExported :: Either e a -> Result e a

instance {-# OVERLAPS #-} SyncException Void where
  type Result Void a = a
  notExported = either absurd id

instance Exception e => SyncException e where
  type Result e a = Either e a
  notExported = id 

This is like idea 2, but gets rid of the fork/async distinction entirely. Unfortunately, this means we need a type class to allow us to return just an a, not a Either Void a, in the Thread Void case, which was previously accomplished by having two separate functions.

I've named the type class SyncException here, just as a reminder that in ki, asynchronous exceptions are always propagated, but it could really be called anything, including (confusingly) Exception.

Hang with STM & two scopes

The following program hangs indefinitely:

{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DeriveAnyClass #-}

module Main where

import Control.Monad
import Control.Exception
import Control.Concurrent
import Control.Concurrent.STM
import Ki

data Bang = Bang
  deriving (Exception, Show)

main :: IO ()
main = 
  scoped \scope1 -> do
    explodingThread <- fork scope1 do
      threadDelay 1_000_000
      throwIO Bang

    scoped \ki -> do
      c <- newTChanIO

      threadId <- fork ki do
        forever do
          atomically do
            readTChan c

      putStrLn "Waiting"
      threadDelay 10_000_000

I would expect this to be fine, and die with the Bang exception. If we use a single scope, that's exactly what happens, so it seems to be some kind of interation between two scopes.

Should `ki` force the return value of a thread to WHNF?

I wonder if ki should force a thread's result to WHNF before returning.

This would be inconsistent with async, and probably other threading libraries, so by the principle of least surprise, maybe we shouldn't do this.

However, it might be nice to assist users in performing (some) computation on the dedicated thread they explicitly spawn, rather than let them accidentally conclude a background thread with some expensive thunk that's computed by the first awaiter who peeks at it.

Clarify exception propagation behaviour

I had some questions reading the (generally good!) documentation.

  • Exceptions propagated from child to parent and vice versa are presumably always thrown asynchronously? (I think this is the only possible thing, I just had to stop and think and I think it could be written down)
  • Which exception is used to kill child threads? I think it's ScopeClosing, but it's not exported. Maybe you never need it? But I could imagine a situation where you want to catch various other async exceptions but not ScopeClosing.

How to kill a single thread

Would it be possible to add a kill :: Thread a -> IO () function? And could this be ignored by the thread scope?

#7 mention being able to cancel a thread, is there a reason not to?

Nested scope stuck on Data.ByteString.hGetLine

Hello, it looks like a scope may not terminate properly when reading the output of an external process using hGetLine.

When running the following example I get:

$ cabal run ki-blocked-repro.hs
Cat is running...
Cat is running...
Exiting main scope...
  C-c C-ccabal-script-ki-blocked-repro.hs: thread blocked indefinitely in an STM transaction

The program hangs until it is interrupted with Ctrl-C.

When trying to minify the reproducer, I noticed that removing the nested scope, by replacing Ki.scoped catAction with catAction scope, the issue disappear: the program exit cleanly. Thus it seems like a bug in ki, but I can't tell the root cause, and perhaps it's related to the process library and how the exceptions are masked?

{- cabal:
build-depends: base, bytestring, ki, typed-process
ghc-options: -threaded -rtsopts -with-rtsopts=-N -with-rtsopts=-T
-}
module Main where

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import qualified Data.ByteString as BS
import qualified Ki as Ki
import qualified System.Process.Typed as ProcessTyped

catAction :: Ki.Scope -> IO ()
catAction scope =
    ProcessTyped.withProcessWait_ cmd $ \p -> do
        Ki.fork_ scope $ forever $ BS.hGetLine (ProcessTyped.getStdout p)
        forever $ do
            putStrLn $ "Cat is running..."
            threadDelay 300000
  where
    cmd =
        ProcessTyped.setStdout ProcessTyped.createPipe $
            ProcessTyped.proc "cat" []

main :: IO ()
main = do
    Ki.scoped $ \scope -> do
        _ <- scope `Ki.fork` Ki.scoped catAction
        threadDelay 500000
        putStrLn "Exiting main scope..."
    putStrLn "This is the end."

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.