s_mach.concurrent is an open-source Scala library that provides asynchronous serial and parallel execution flow control primitives for working with asynchronous tasks. An asynchronous task consists of two or more calls to function(s) that return a future result A => Future[B] instead of the result A => B. s_mach.concurrent also provides utility & convenience code for working with scala.concurrent.Future.
-
Adds concurrent flow control primitives async and async.par for performing fixed size heterogeneous (tuple) and variable size homogeneous (collection) asynchronous tasks. These primitives:
-
Allow enabling optional progress reporting, failure retry and/or throttle control for asynchronous tasks
-
Ensure proper sequencing of returned futures, e.g. given f: Int => Future[String]:
-
List(1,2,3).async.map(f) returns Future[List[String]]
-
async.par.run(f(1),f(2),f(3)) returns Future[(String,String,String)]
-
-
Ensure fail-immediate sequencing of future results (see the 'Under the hood: Merge' section for details)
-
Ensure all exceptions generated during asynchronous task processing can be retrieved (Future.sequence returns only the first)
-
-
collection.async and collection.async.par support collection operations such as map, flatMap and foreach on asynchronous functions, i.e. A => Future[B]
-
async.par.run(future1, future2, ...) supports running fixed size heterogeneous asynchronous task (of up to 22 futures) in parallel
-
Adds ScheduledExecutionContext, a Scala interface wrapper for java.util.concurrent.ScheduledExecutorService that provides for scheduling delayed and periodic tasks
-
Adds non-blocking concurrent control primitives such as Barrier, Latch, Lock and Semaphore
-
Provides convenience methods for writing more readable, concise and DRY concurrent code such as Future.get, Future.toTry and Future.fold
-
Add to build.sbt
libraryDependencies += "net.s_mach" %% "concurrent" % "1.0.2"
Notes_mach.concurrent is currently only compiled for Scala 2.11 (though 2.10.4 support can be added if there is interest)
All code examples assume the following imports:
import scala.util._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import s_mach.concurrent._
import s_mach.concurrent.util._
implicit val scheduledExecutionContext = ScheduledExecutionContext(2)
case class Item(id: String, value: Int, relatedItemId: String)
def read(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); Item(id,id.toInt,(id.toInt+1).toString) }
def readFail(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) }
def longRead(id: String) : Future[Item] = Future { Thread.sleep(2000); println(id); Item(id,id.toInt,(id.toInt+1).toString) }
def write(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); true }
def writeFail(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) }
A common task when working with futures is transforming or traversing a collection in serial or parallel that will call a function that returns a future. With only a few levels of nesting, the standard idioms for accomplishing this lead to difficult to read code. In the following example, a collection of ten identifiers is grouped to batch identifier reads. The flow of execution for each batch is serial while the flow of the identifiers within each batch is parallel.
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
for {
oomItem <- {
println("Reading...")
oomItemIdBatch
// Serially perform read of each batch
.foldLeft(Future.successful(List[Item]())) { (facc, idBatch) =>
for {
acc <- facc
// Parallel read batch
oomItem <- Future.sequence(idBatch.map(read))
} yield acc ::: oomItem
}
}
_ = println("Computing...")
oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(2).toList
oomResult <- {
println("Writing...")
oomNewItemBatch
// Serially perform write of each batch
.foldLeft(Future.successful(List[Boolean]())) { (facc, itemBatch) =>
for {
acc <- facc
// Parallel write batch
oomResult <- Future.sequence(itemBatch.map(item => write(item.id, item)))
} yield acc ::: oomResult
}
}
} yield oomResult.forall(_ == true)
}
The same code, rewritten using async and async.par:
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
for {
oomItem <- {
println("Reading...")
oomItemIdBatch.async.flatMap(_.async.par.map(read))
}
_ = println("Computing...")
oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
oomResult <- {
println("Writing...")
oomNewItemBatch.async.flatMap(_.async.par.map(item => write(item.id, item)))
}
} yield oomResult.forall(_ == true)
}
async.par allows specifying the maximum number of simultaneous workers used during an asynchronous task. In the following example, batches are processed in parallel with at most two workers, while each identifier within a batch is processed with at most four workers.
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
for {
oomItem <- {
println("Reading...")
oomItemIdBatch.async.par(2).flatMap(_.async.par(4).map(read))
}
_ = println("Computing...")
oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
oomResult <- {
println("Writing...")
oomNewItemBatch.async.par(2).flatMap(_.async.par(4).map(item => write(item.id, item)))
}
} yield oomResult.forall(_ == true)
}
async and async.par can be optionally modified to report progress, retry failures and/or limit iteration speed to a specific time period for asynchronous tasks. In the following example, completion of each batch reports progress and batches may not complete faster than one every three seconds. For each identifier that is read and fails, the first three TimeoutExceptions or SocketTimeoutExceptions are retried. All other exceptions cause the entire task to fail.
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
for {
oomItem <- {
println("Reading...")
oomItemIdBatch
.async
.progress(1.second)(progress => println(progress))
.throttle(3.seconds)
.flatMap { batch =>
batch
.async.par
// Retry at most first 3 timeout and socket exceptions after delaying 100 milliseconds
.retry {
case (_: TimeoutException) :: tail if tail.size < 3 =>
Future.delayed(100.millis)(true)
case (_: SocketTimeoutException) :: tail if tail.size < 3 =>
Future.delayed(100.millis)(true)
case _ => false.future
}
.map(read)
}
}
_ = println("Computing...")
oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
oomResult <- {
println("Writing...")
oomNewItemBatch.workers(2).flatMap(_.workers(4).map(item => write(item.id, item)))
}
} yield oomResult.forall(_ == true)
}
When first using Future with a for-comprehension, it is natural to assume the following will produce parallel operation:
for {
i1 <- read("1")
i2 <- read("2")
i3 <- read("3")
} yield (i1,i2,i3)
Sadly, this code will compile and run just fine, but it will not execute in parallel. To correctly implement parallel operation, the following standard pattern is used:
val f1 = read("1")
val f2 = read("2")
val f3 = read("3")
val future = { // necessary for pasting into repl
for {
i1 <- f1
i2 <- f2
i3 <- f3
} yield (i1,i2,i3)
}
For parallel operation, all of the futures must be started before the for-comprehension. The for-comprehension is a monadic workflow which captures commands that must take place in a specific sequential order. The pattern in Example 6 is necessary because Scala lacks an applicative workflow which captures commands that may be run in any order. s_mach.concurrent adds the async.par.run workflow which is an applicative workflow specifically for fixed size heterogeneous asynchronous tasks. This workflow can more concisely express the pattern above.
In the example below, all futures are started at the same time by async.par.run which returns a Future[(Int,Int,Int)] that completes once all supplied futures complete. After this returned future completes, the tuple value results can be extracted using normal Scala idioms.
for {
(i1,i2,i3) <- async.par.run(read("1"), read("2"), read("3"))
} yield (i1,i2,i3)
Additionally, all of the configuration options available for collection.async.par are valid for async.par.run. In the example below, the number of workers is limited to two, progress is reported once a second and certain failures are retried.
for {
(i1,i2,i3) <-
async
.par(2)
.progress(1.second)(progress => println(progress))
.retry {
case (_: TimeoutException) :: tail if tail.size < 3 =>
Future.delayed(100.millis)(true)
case (_: SocketTimeoutException) :: tail if tail.size < 3 =>
Future.delayed(100.millis)(true)
case _ => false.future
}
.run(
read("1"),
read("2"),
read("3")
)
} yield (i1,i2,i3)
The async and async.par primitives utilize the merge and flatMerge sequencing functions to ensure that execution ends immediately once a failure occurs. This is in contrast to Future.sequence which may not always fail immediately when a failure occurs.
The merge function performs the same function as Future.sequence (it calls Future.sequence internally) but it ensures that the returned future completes immediately after an exception occurs in any of the futures. Because Future.sequence waits on all futures in left to right order before completing, an exception thrown at the beginning of the computation by a future at the far right will not be detected until after all other futures have completed. For long running computations, this can mean a significant amount of wasted time waiting on futures to complete whose results will be discarded.
Additionally, while the scala parallel collections correctly handle multiple parallel exceptions, Future.sequence only returns the first exception encountered. In Future.sequence, all further exceptions past the first are discarded. The merge and flatMerge methods fix these problems by throwing AsyncParThrowable. AsyncParThrowable has a member method to access both the first exception thrown and a future of all exceptions thrown during the computation.
scala> val t = Future.sequence(Vector(longRead("1"),readFail("2"),readFail("3"),read("4"))).getTry
3
4
2
1
t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(java.lang.RuntimeException: 2)
scala>
scala> val t = Vector(longRead("1"),readFail("2"),readFail("3"),read("4")).merge.getTry
2
t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(AsyncParThrowable(java.lang.RuntimeException: 2))
3
scala> 4
1
scala> val allFailures = t.failed.get.asInstanceOf[AsyncParThrowable].allFailure.get
allFailures: Vector[Throwable] = Vector(java.lang.RuntimeException: 2, java.lang.RuntimeException: 3)