Coder Social home page Coder Social logo

traneio / future Goto Github PK

View Code? Open in Web Editor NEW
231.0 16.0 18.0 935 KB

High-performance Future implementation for the JVM

Home Page: http://trane.io

License: Apache License 2.0

Java 93.66% Shell 1.22% Scala 5.12%
java8 non-blocking futures jvm asynchronous

future's Introduction

Trane.io Future

Build Status Code Coverage Tech Debt Maven Central Javadocs Join the chat at https://gitter.im/traneio/future

This project is a high-performance implementation of the Future abstraction. The library was designed from scratch with a focus on reducing CPU usage and memory footprint, and having the Twitter Future as its main inspiration.

It allows the user to express complex asynchronous code in a composable and type-safe manner. It also supports more advanced features that are currently only available for Twitter's Future and are essential to developing non-trivial systems. Namely, it provides Locals, that are similar to ThreadLocals but for asynchronous code, and interrupts, also known as cancellations.

The current version has only one implementation in Java, but the intent is to create modules for other JVM languages to make the API idiomatic in each language.

Getting started

The library binaries are distributed through maven central. Click on the maven central badge for information on how to add the library dependency to your project:

Maven Central

This project does not have a mailing list; please use our gitter channel:

Join the chat at https://gitter.im/traneio/future

Please refer to the Javadoc for detailed information about the library and its features:

Javadoc

The Future abstraction

Future is an abstraction to deal with asynchronicity without having to use callbacks directly or blocking threads. The primary usage for Futures on the JVM is to perform IO operations, which are asynchronous by nature.

Although most IO APIs return synchronously, they do that by blocking the current Thread. For instance, the thread issues a request to a remote system and then waits until a response comes back. Considering that the JVM uses native threads, it is wasteful to block them since it leads to potential thread starvation and higher garbage collection pressure. It is hard to scale a JVM system vertically if the IO throughput is bounded by the number of threads.

From the user perspective, a Future can be in three states:

  1. Uncompleted
  2. Completed with a value
  3. Completed with an exception

Instead of exposing this state, Future provides combinators to express computations that run once the Future completes. The results of these combinators are Future instances that can be used to perform other transformations, giving the user a powerful tool to express complex chains of asynchronous transformations.

Let's say that we need to call a remote service to get the username given an id:

Future<User> user = userService.get(userId);

It's possible to apply the map transformation that produces a Future for the username string:

Future<String> username = user.map(user -> user.username);

Note that we are using a lambda expression (user -> user.username) that takes a user and returns its username.

Let's say that now we need to call a service to validate the username string. This is the result if we use the map combinator for it:

Future<Future<Boolean>> isValid = 
  username.map(username -> usernameService.isValid(username));

Given that the lambda expression calls another service and returns a Future, the produced result is a nested future (Future<Future<Boolean>>). One alternative to flatten this nested result is using Future.flatten:

Future<Boolean> isValidFlat = Future.flatten(isValid);

There's a convenient combinator called flatMap that applies both map and Future.flatten at once:

Future<Boolean> isValid = 
  username.flatMap(username -> usernameService.isValid(username));

The flatMap combinator is very flexible and comes from the monad abstraction. Although useful, learning monads and category theory is not a requirement to use Futures.

There are many other useful operators to deal with exceptions, collections of futures, and others. For a complete reference, please see the javadocs.

Execution model

Futures are eager by nature. Once a future is created, the asynchronous computation is triggered. For instance, even though these two futures are composed sequentially through the flatMap combinator, they are already running in parallel:

Future<User> user = userService.get(userId);
Future<List<Tweet>> tweets = timelineService.getUserTweets(userId);

Future<Profile> profile = 
  user.flatMap(u ->
    tweets.map(t ->
      new Profile(u, t);
    )
  )

Both calls are issued to the remote service when the futures are created, and the flatMap combinator only uses the Future instances that are already running. If the call to tweetService is inlined within the flatMap lambda body, the tweetService is called only after the userService returns:

Future<User> user = userService.get(userId);

Future<Profile> profile = 
  user.flatMap(u ->
    timelineService.getUserTweets(userId).map(t ->
      new Profile(u, t);
    )
  )

This implementation of Future leverages this behavior to avoid context thread switches. The execution of the asynchronous computation reuses the current thread until it reaches an asynchronous boundary, where it cannot continue executing since it needs to wait for the completion of an asynchronous operation like a remote system call. For instance, this computation runs entirely on the current thread synchronously:

Future.value(1).map(i -> i + 1);

If there's an asynchronous boundary, the future composition is executed until it reaches the boundary:

Future.value(1)
  .map(i -> i + 1) // Runs on the current thread
  .flatMap(i -> callAService(i)); // Issues the request on the current thread

This composition is executed by the current thread and stops at the point where the remote call is issued to the network layer. Once the remote service returns the response, the network layer thread continues the execution of the remaining steps:

Future.value(1)
  .map(i -> i + 1) // Runs on the current thread
  .flatMap(i -> callAService(i)) // Issues the request on the current thread
  .map(i -> i == 10); // Runs on the network thread that satisfies the async boundary

Asynchronous boundaries are defined using Promises:

public Future<Integer> callAService(Integer i) {
  Promise<Integer> p = Promise.apply();
  networkLayer.issueRequest(i).onComplete(i -> p.setValue(i));
  return p;
}

Note that Promise is a Future that provides methods to set its result. They are useful to interact with callback-based APIs like the ones that are typically provided by network libraries. The promise is created and returned synchronously to the caller, but it is pending until the onComplete callback is executed by the network layer.

Using Promises, it is possible to create fully asynchronous code throughout the application stack and never block threads. It is a common misconception that blocking must happen at some layer of the application. For instance, it is possible to satisfy a request to a server and avoid blocking to write the result back to the client using a lambda that captures a reference to the network connection/session. Example:

public void processRequest(Request request, Connection conn) {
  callEndpointMethod(request)
    .onSuccess(result-> conn.writeSuccess(result))
    .onFailure(ex -> conn.writeFailure(ex));
}

Recursive Futures

Given the optimization that this library implements to avoid thread context switch, compositions are not stack-safe by default. It is necessary to wrap recursive computations with a Tailrec call:

public Future<Integer> factorial(Integer i) {
  Tailrec.apply(() ->
    if (i ==0) return Future.value(1);
    else factorial(i - 1).map(j -> i * j);
  )
}

This is just an example, there's no reason to use Futures to implement a factorial function. Requiring the Tailrec call for recursive computations is a reasonable compromise since recursive futures are uncommon.

Even though the computation is wrapped by Tailrec, the execution still leverages the synchronous execution optimizations in batches. It executes the composition synchronously until it reaches the batch size and then uses a Promise to unwind the stack and then run the next batch.

The default batch size is defined by the system property "io.trane.future.defaultBatchSize", which is 512 by default. Alternatively, it is possible to set the batch size when calling Tailrec.apply:

public Future<Integer> factorial(Integer i) {
  Tailrec.apply(1024, () ->
    if (i ==0) return Future.value(1);
    else factorial(i - 1).map(j -> i * j);
  )
}

Note that the first parameter defines the batch size as 1024. Typically, the users do not need to tune this parameter unless a StackOverflowException is thrown or the user wants to increase the batch size for performance reasons. Larger batches tend to improve performance but increase the risk of a StackOverflowException.

Isolating thread pools

It is possible to isolate portions of a Future composition on a separate thread pool:

FuturePool futurePool = FuturePool.apply(Executors.newCachedThreadPool());

Future<List<Token>> user = 
  documentService.get(docId)
    .flatMap(doc -> futurePool.async(tokenize(doc)))

This feature useful to isolate cpu-intensive tasks and blocking operations. Please refer to the Java documentation to decide which type of executor is the best for the kind of task that needs to be performed. For instance, a ForkJoinPool is useful for cpu-intensive tasks, but can't be used for blocking operations.

The FuturePool also has the method isolate that isolates the execution of a Future:

FuturePool futurePool = FuturePool.apply(Executors.newCachedThreadPool());

Future<User> user = futurePool.isolate(userRepo.get(userId));

isolate is just a shortcut for async + Future.flatten.

Locals

It is not possible to use ThreadLocals with Future because the data it holds become invalid when the computation reaches an asynchronous boundary. The thread returns to its thread pool to execute other computations, and the continuations are performed by the thread that sets the result of the Promise.

Locals are a mechanism similar to ThreadLocal, but it has a more flexible scope. For example, this code sets the UserSession local when a request is processed:

public class UserSession {
  public static final Local<UserSession> local= Local.apply();
  // UserSession impl
}

public class MyService {
  public Future<List<Tweet>> getTweetsEndpoint(Request request) {
    UserSession.local.let(
      request.getSession(), 
      () -> tweetRepo.get(request.getUserId())
    );
  }
}

Note that the let method is used to define the local value, execute the function defined by the second parameter, and then set the local to its previous value. It is a convenient method to avoid having to set and clear the value manually:

public class MyService {
  public Future<List<Tweet>> getTweetsEndpoint(Request request) {
    final Optional<UserSessuib> saved = UserSession.local.get();
    UserSession.local.set(Optional.of(request.getSession()));
    try {
      return tweetRepo.get(request.getUserId());
    } finally {
      UserSession.local.set(saved);
    }
  }
}

At any point of the of the request processing, even after asynchronous boundaries, the user session can be accessed. For instance, let's say that tweetRepo uses a TweetStorage that routes the query to a specific database shard based on the user that is requesting the tweet:

public class TweetStorage {
  public Future<RawTweet> getTweet(long tweetId) {
    databaseFor(UserSession.local.get().getUserId()).getTweet(tweetId);
  }
}

This feature is implemented with a ThreadLocal that is saved at the point of an asynchronous boundary as a Promise field and is restored when the Promise is satisfied, flushing its continuations with the original ThreadLocal contents.

Note: This feature does not have the same behavior as Twitter's Local. The ThreadLocal state is captured when a Promise is created, whereas the Twitter's implementation captures the state only when a Promise continuation is created (for instance, map is called on a Promise instance). In practice, most Promise creations are followed by a continuation, so the behavior is usually the same.

Interrupts/cancellations

This feature provides a way to send signals to the current pending Promise given a Future composition. It is a mechanism that enables cancellations. For instance, given this composition that involves an async boundary (userService.get) and a continuation (.map):

Future<String> username = userService.get(userId).map(user -> user.username);

It is possible to raise an interrupt that is received by the userService.get Promise:

username.raise(new TimeoutException);

The Promise created by userService.get can define a custom handler that performs an action in case an interrupt is received.

Promise.apply has overloaded methods that allow the user to set the interrupt handler. This mechanism can be used to cancel requests to remote systems, as Finagle does.

The method interruptible is a shortcut to fail the Promise if it receives any interrupt signal:

Future<String> username = 
  userService.get(userId).interruptible().map(user -> user.username);

username.raise(new TimeoutException);

In this case, even if userService.get does not handle interrupts, the Promise is satisfied with the interrupt exception.

The interrupt propagation happens through pointers from each continuation to its parent that are created automatically by the library. In the previous example, the map continuation has a pointer to the Promise that is pending.

Benchmarks

This library scores better than the main Future implementations available on the JVM in multiple scenarios, both in terms of throughput and memory footprint.

To run the benchmarks, use the run.sh script under the future-benchmark folder. It also outputs results for Java's, Scala's, and Twitter's Future implementations for comparison.

FAQ

Why create a new Future implementation?

This project aims to provide a Future implementation with the following characteristics:

  1. Pure Java implementation without dependencies
  2. Convenient API with combinators for common operations
  3. Local and interrupts support, essential for non-trivial systems
  4. Low CPU usage and memory footprint

Currently, there aren't other Future libraries with this feature set.

Why trane?

The name is in honor of the great saxophonist John Coltrane, also known as Trane (his nickname).

“Invest yourself in everything you do. There's fun in being serious.” -- John Coltrane

Why is it high-performance?

Several techniques were used to optimize this library. For an overview, please refer to CONTRIBUTING.md.

Code of Conduct

Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See CODE_OF_CONDUCT.md for details.

License

See the LICENSE file for details.

future's People

Contributors

fwbrasil avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

future's Issues

Add some example code will be better .

This template isn't a strict requirement to open issues, but please try to provide as much information as possible.

Version: (e.g. 0.1.0)

Expected behavior

Actual behavior

Steps to reproduce the behavior

Workaround

Benchmarking

Hello,

In benchmarks you have to be really careful about what you're measuring. For example when you’re measuring how fast you can extract a value from a Promise by blocking on its result, using your get implementation, what actually happens there is that your operation is going to be faster than Scala’s Await.result because Scala uses its BlockContext to signal to the underlying thread-pool that a blocking op occurs, so you have extra overhead because Await.result calls blocking, which involves a ThreadLocal.get + an extra closure being created and then executed.
And if it actually runs on a BlockingContext enabled thread-pool (like Scala's global), it also means it’s going to add an extra thread to the pool.

For measuring Monix, as I've said on the channel, by calling runAsync(Scheduler) it gives you back a CancelableFuture, which for actually async results (as those created by Task.create) is powered by an underlying Scala Promise. But this is only for convenience. If you want to measure its actual performance, then you need to work with runAsync(Callback).

First of all you need a callback implementation:

import monix.eval.Callback
import java.util.concurrent.locks.AbstractQueuedSynchronizer
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.{Failure, Success, Try}

final class AwaitableCallback[A] extends Callback[A] {
  /** Keeps track of the completion status:
    *
    *  - 0: initial value, waiting for result
    *  - 1: completed with success
    *  - 2: completed with failure
    */
  private[this] var state = 0
  private[this] var success: A = _
  private[this] var error: Throwable = _
  private[this] val latch = new OneShotLatch

  def onSuccess(value: A): Unit = {
    this.success = value
    this.state = 1
    latch.releaseShared(1)
  }

  def onError(ex: Throwable): Unit = {
    this.error = ex
    this.state = 2
    latch.releaseShared(1)
  }

  def value: Option[Try[A]] =
    state match {
      case 0 => None
      case 1 => Some(Success(success))
      case _ => Some(Failure(error))
    }

  def blockForResult: A = {
    tryAwait(Duration.Inf)
    state match {
      case 1 => success
      case _ => throw error
    }
  }

  private def tryAwait(atMost: Duration): Boolean = {
    import Duration.Undefined
    if (state != 0) true else {
      atMost match {
        case e if e eq Undefined =>
          throw new IllegalArgumentException("cannot wait for Undefined period")
        case Duration.Inf =>
          latch.acquireSharedInterruptibly(1)
        case f: FiniteDuration if f > Duration.Zero =>
          latch.tryAcquireSharedNanos(1, f.toNanos)
        case _ =>
          () // Do nothing
      }

      state != 0
    }
  }

  private final class OneShotLatch extends AbstractQueuedSynchronizer {
    override protected def tryAcquireShared(ignored: Int): Int =
      if (getState != 0) 1 else -1

    override protected def tryReleaseShared(ignore: Int): Boolean = {
      setState(1)
      true
    }
  }
}

We are also going to replace Task.create with Task.unsafeCreate, because Task.create has extra overhead for safety reasons:

def signalValue[A](value: A): Task[A] =
  Task.unsafeCreate { (context, callback) =>
    callback.onSuccess(value)
  }

Speaking of safety, Monix has this feature where it does execution in batches, so it executes multiple cycles on the same thread, but then it jumps threads when its internal counter goes over a threshold. The purpose is to preserve liveness and not keep a thread occupied forever. Such a jump should happen if you do enough flatMap calls in a loop (e.g. over 1024), but for a benchmark there's no point in having such a feature enabled:

import monix.execution.ExecutionModel.SynchronousExecution
import monix.execution.Scheduler.global

implicit val scheduler = global.withExecutionModel(SynchronousExecution)

And then blocking for that result:

val awaitable = new AwaitableCallback[String]
signalValue("hello").runAsync(awaitable)
val result: String = awaitable.blockForResult

Also as a word of warning, this never happens in actual code:

for (int i = 0; i < 100; i++)
  f = f.flatMap(flatMapF);

What happens in actual code are recursive loops:

def loop[A](n: Int, task: Task[A]): Task[A] =
  task.flatMap { a =>
    if (n <= 0) Task.now(a)
    else loop(n-1, task)
  }

And yes, it does make a difference because your sample does more strict evaluation than what happens in practice.

And if you're actually interested in the performance of flatMap, you have to do it enough times to make sure that it dominates the throughput of the benchmark, otherwise with an efficient flatMap you're going to measure how fast you're blocking for a result. So instead of 100, make that 10000 at least.

And another thought: Task makes it possible to describe an entire program and then call runAsync only once at the edges of that program, something that you cannot do with a strict Future/Promise implementation. This means that the performance of Task.runAsync is less relevant than the performance of its flatMap.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.