Coder Social home page Coder Social logo

paddymahoney / s_mach.concurrent Goto Github PK

View Code? Open in Web Editor NEW

This project forked from s-mach/s_mach.concurrent

0.0 3.0 0.0 1014 KB

s_mach.concurrent is an open-source Scala library that provides asynchronous serial and parallel execution flow control primitives for working with asynchronous tasks. An asynchronous task consists of two or more calls to function(s) that return a future result A ⇒ Future[B] instead of the result A ⇒ B. s_mach.concurrent also provides utility & convenience code for working with scala.concurrent.Future

License: MIT License

s_mach.concurrent's Introduction

s_mach.concurrent: Futures utility library

s_mach.concurrent is an open-source Scala library that provides asynchronous serial and parallel execution flow control primitives for working with asynchronous tasks. An asynchronous task consists of two or more calls to function(s) that return a future result A => Future[B] instead of the result A => B. s_mach.concurrent also provides utility & convenience code for working with scala.concurrent.Future.

  • Adds concurrent flow control primitives async and async.par for performing fixed size heterogeneous (tuple) and variable size homogeneous (collection) asynchronous tasks. These primitives:

    • Allow enabling optional progress reporting, failure retry and/or throttle control for asynchronous tasks

    • Ensure proper sequencing of returned futures, e.g. given f: Int => Future[String]:

      • List(1,2,3).async.map(f) returns Future[List[String]]

      • async.par.run(f(1),f(2),f(3)) returns Future[(String,String,String)]

    • Ensure fail-immediate sequencing of future results (see the 'Under the hood: Merge' section for details)

    • Ensure all exceptions generated during asynchronous task processing can be retrieved (Future.sequence returns only the first)

  • collection.async and collection.async.par support collection operations such as map, flatMap and foreach on asynchronous functions, i.e. A => Future[B]

  • async.par.run(future1, future2, ...) supports running fixed size heterogeneous asynchronous task (of up to 22 futures) in parallel

  • Adds ScheduledExecutionContext, a Scala interface wrapper for java.util.concurrent.ScheduledExecutorService that provides for scheduling delayed and periodic tasks

  • Adds non-blocking concurrent control primitives such as Barrier, Latch, Lock and Semaphore

  • Provides convenience methods for writing more readable, concise and DRY concurrent code such as Future.get, Future.toTry and Future.fold

Include in SBT

  1. Add to build.sbt

    libraryDependencies += "net.s_mach" %% "concurrent" % "1.0.2"
    Note
    s_mach.concurrent is currently only compiled for Scala 2.11 (though 2.10.4 support can be added if there is interest)

Imports for Examples

All code examples assume the following imports:

import scala.util._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import s_mach.concurrent._
import s_mach.concurrent.util._

implicit val scheduledExecutionContext = ScheduledExecutionContext(2)
case class Item(id: String, value: Int, relatedItemId: String)
def read(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); Item(id,id.toInt,(id.toInt+1).toString) }
def readFail(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) }
def longRead(id: String) : Future[Item] = Future { Thread.sleep(2000); println(id); Item(id,id.toInt,(id.toInt+1).toString) }
def write(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); true }
def writeFail(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) }

Asynchronously transform or traverse collections

A common task when working with futures is transforming or traversing a collection in serial or parallel that will call a function that returns a future. With only a few levels of nesting, the standard idioms for accomplishing this lead to difficult to read code. In the following example, a collection of ten identifiers is grouped to batch identifier reads. The flow of execution for each batch is serial while the flow of the identifiers within each batch is parallel.

Example 1: Transform and traverse collections, standard idiom
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
  for {
    oomItem <- {
      println("Reading...")
      oomItemIdBatch
        // Serially perform read of each batch
        .foldLeft(Future.successful(List[Item]())) { (facc, idBatch) =>
          for {
            acc <- facc
            // Parallel read batch
            oomItem <- Future.sequence(idBatch.map(read))
          } yield acc ::: oomItem
        }
    }
    _ = println("Computing...")
    oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(2).toList
    oomResult <- {
      println("Writing...")
      oomNewItemBatch
        // Serially perform write of each batch
        .foldLeft(Future.successful(List[Boolean]())) { (facc, itemBatch) =>
          for {
            acc <- facc
            // Parallel write batch
            oomResult <- Future.sequence(itemBatch.map(item => write(item.id, item)))
          } yield acc ::: oomResult
        }
    }
  } yield oomResult.forall(_ == true)
}

The same code, rewritten using async and async.par:

Example 2: Using async and async.par to transform and traverse collections:
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
  for {
    oomItem <- {
      println("Reading...")
      oomItemIdBatch.async.flatMap(_.async.par.map(read))
    }
    _ = println("Computing...")
    oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
    oomResult <- {
      println("Writing...")
      oomNewItemBatch.async.flatMap(_.async.par.map(item => write(item.id, item)))
    }
  } yield oomResult.forall(_ == true)
}

Limiting the maximum number of simultaneous workers

async.par allows specifying the maximum number of simultaneous workers used during an asynchronous task. In the following example, batches are processed in parallel with at most two workers, while each identifier within a batch is processed with at most four workers.

Example 3: Using s_mach.concurrent workers to transform and traverse collections:
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
  for {
    oomItem <- {
      println("Reading...")
      oomItemIdBatch.async.par(2).flatMap(_.async.par(4).map(read))
    }
    _ = println("Computing...")
    oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
    oomResult <- {
      println("Writing...")
      oomNewItemBatch.async.par(2).flatMap(_.async.par(4).map(item => write(item.id, item)))
    }
  } yield oomResult.forall(_ == true)
}

Adding progress reporting, retry and throttle control to asynchronous tasks

async and async.par can be optionally modified to report progress, retry failures and/or limit iteration speed to a specific time period for asynchronous tasks. In the following example, completion of each batch reports progress and batches may not complete faster than one every three seconds. For each identifier that is read and fails, the first three TimeoutExceptions or SocketTimeoutExceptions are retried. All other exceptions cause the entire task to fail.

Example 4: Adding progress reporting, retry and throttle control to collection concurrent operations
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
  for {
    oomItem <- {
      println("Reading...")
      oomItemIdBatch
        .async
        .progress(1.second)(progress => println(progress))
        .throttle(3.seconds)
        .flatMap { batch =>
          batch
            .async.par
            // Retry at most first 3 timeout and socket exceptions after delaying 100 milliseconds
            .retry {
              case (_: TimeoutException) :: tail if tail.size < 3 =>
                Future.delayed(100.millis)(true)
              case (_: SocketTimeoutException) :: tail if tail.size < 3 =>
                Future.delayed(100.millis)(true)
              case _ => false.future
            }
            .map(read)
        }
    }
    _ = println("Computing...")
    oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
    oomResult <- {
      println("Writing...")
      oomNewItemBatch.workers(2).flatMap(_.workers(4).map(item => write(item.id, item)))
    }
  } yield oomResult.forall(_ == true)
}

async.par workflow for fixed size heterogeneous asynchronous tasks

When first using Future with a for-comprehension, it is natural to assume the following will produce parallel operation:

Example 5: Does not execute futures in parallel
for {
  i1 <- read("1")
  i2 <- read("2")
  i3 <- read("3")
} yield (i1,i2,i3)

Sadly, this code will compile and run just fine, but it will not execute in parallel. To correctly implement parallel operation, the following standard pattern is used:

Example 6: Correct Future parallel operation:
val f1 = read("1")
val f2 = read("2")
val f3 = read("3")
val future = { // necessary for pasting into repl
  for {
    i1 <- f1
    i2 <- f2
    i3 <- f3
  } yield (i1,i2,i3)
}

For parallel operation, all of the futures must be started before the for-comprehension. The for-comprehension is a monadic workflow which captures commands that must take place in a specific sequential order. The pattern in Example 6 is necessary because Scala lacks an applicative workflow which captures commands that may be run in any order. s_mach.concurrent adds the async.par.run workflow which is an applicative workflow specifically for fixed size heterogeneous asynchronous tasks. This workflow can more concisely express the pattern above.

In the example below, all futures are started at the same time by async.par.run which returns a Future[(Int,Int,Int)] that completes once all supplied futures complete. After this returned future completes, the tuple value results can be extracted using normal Scala idioms.

Example 7: async.par.run workflow
for {
  (i1,i2,i3) <- async.par.run(read("1"), read("2"), read("3"))
} yield (i1,i2,i3)

Additionally, all of the configuration options available for collection.async.par are valid for async.par.run. In the example below, the number of workers is limited to two, progress is reported once a second and certain failures are retried.

Example 8: async.par.run workflow with two workers, progress reporting and failure retry
for {
  (i1,i2,i3) <-
    async
      .par(2)
      .progress(1.second)(progress => println(progress))
      .retry {
        case (_: TimeoutException) :: tail if tail.size < 3 =>
          Future.delayed(100.millis)(true)
        case (_: SocketTimeoutException) :: tail if tail.size < 3 =>
          Future.delayed(100.millis)(true)
        case _ => false.future
      }
      .run(
        read("1"),
        read("2"),
        read("3")
      )
} yield (i1,i2,i3)

Under the hood: Merge function

The async and async.par primitives utilize the merge and flatMerge sequencing functions to ensure that execution ends immediately once a failure occurs. This is in contrast to Future.sequence which may not always fail immediately when a failure occurs.

The merge function performs the same function as Future.sequence (it calls Future.sequence internally) but it ensures that the returned future completes immediately after an exception occurs in any of the futures. Because Future.sequence waits on all futures in left to right order before completing, an exception thrown at the beginning of the computation by a future at the far right will not be detected until after all other futures have completed. For long running computations, this can mean a significant amount of wasted time waiting on futures to complete whose results will be discarded.

Additionally, while the scala parallel collections correctly handle multiple parallel exceptions, Future.sequence only returns the first exception encountered. In Future.sequence, all further exceptions past the first are discarded. The merge and flatMerge methods fix these problems by throwing AsyncParThrowable. AsyncParThrowable has a member method to access both the first exception thrown and a future of all exceptions thrown during the computation.

Example 9: Future.sequence gets stuck waiting on longRead to complete and only returns the first exception:
scala> val t = Future.sequence(Vector(longRead("1"),readFail("2"),readFail("3"),read("4"))).getTry
3
4
2
1
t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(java.lang.RuntimeException: 2)

scala>
Example 10: merge method fails immediately on the first exception and throws AsyncParThrowable which can retrieve all exceptions:
scala> val t = Vector(longRead("1"),readFail("2"),readFail("3"),read("4")).merge.getTry
2
t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(AsyncParThrowable(java.lang.RuntimeException: 2))
3

scala> 4
1

scala> val allFailures = t.failed.get.asInstanceOf[AsyncParThrowable].allFailure.get
allFailures: Vector[Throwable] = Vector(java.lang.RuntimeException: 2, java.lang.RuntimeException: 3)

s_mach.concurrent's People

Watchers

James Cloos avatar Patrick Mahoney avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.