Coder Social home page Coder Social logo

yunaiv / cffu Goto Github PK

View Code? Open in Web Editor NEW

This project forked from foldright/cffu

1.0 1.0 1.0 1.65 MB

🦝 Java CompletableFuture Fu, aka. CF-Fu, pronounced "Shifu"; include best practice/traps guide and a tiny 0-dependency sidekick library to improve user experience and reduce misuse.

Home Page: https://github.com/foldright/cffu

License: Apache License 2.0

Shell 1.22% Java 72.22% Kotlin 26.56%

cffu's Introduction

🦝 CompletableFuture Fu (CF-Fu)

🚧 项目还在开发中,发布了v0.x版本: Maven Central

工作项列表及其进展,参见 issue 6


Github Workflow Build Status Github Workflow Build Status Codecov Java support Kotlin License Javadocs Maven Central GitHub Releases GitHub Stars GitHub Forks GitHub Issues GitHub Contributors GitHub repo size gitpod: Ready to Code

shifu

如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。

并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单

其中CompletableFuture (CF)有其优点:

  • Java标准库内置
    • 无需额外依赖,几乎总是可用
    • 相信有极高的实现质量
  • 广为人知广泛使用,有一流的群众基础
    • CompletableFuture在2014年发布的Java 8提供,有~10年了
    • CompletableFuture的父接口Future早在2004年发布的Java 5中提供,有~20年了
    • 虽然Future接口不支持 执行结果的异步获取与并发执行逻辑的编排,但也让广大Java开发者熟悉了Future这个典型的概念与工具
  • 功能强大、但不会非常庞大复杂
    • 足以应对日常的业务需求开发
    • 其它的大型并发框架(比如AkkaRxJava)在使用上需要理解的内容要多很多
    • 当然基本的并发关注方面及其复杂性,与具体使用哪个工具无关,都是要理解与注意的
  • 高层抽象
    • 或说 以业务流程的形式表达技术的并发流程
    • 可以不使用繁琐易错的基础并发协调工具,如CountDownLatch、锁(Lock)、信号量(Semaphore

和其它并发工具、框架一样,CompletableFuture 用于

  • 并发执行业务逻辑,或说编排并发的处理流程/处理任务
  • 利用多核并行处理
  • 提升业务响应性

值得更深入了解和应用。 💕



🎯 〇、目标

  • 作为文档库(即CompletableFuture Guide):
    • 完备说明CompletableFuture的使用方式
    • 给出 最佳实践建议 与 使用陷阱注意
    • 期望在业务中,更有效安全地使用CompletableFuture
  • 作为代码库(即cffu库):
    • 补齐在业务使用中CompletableFuture所缺失的功能
    • 期望在业务中,更方便自然地使用CompletableFuture

🦮 一、CompletableFuture Guide

为了阅读的简洁方便,后文CompletableFuture会简写成CF

🔠 CF并发执行的描述及其用语

cf-graph

基本概念与术语:

  • 任务(Task)/ 计算(Computation
    • 任务逻辑(Task Logic)/ 业务逻辑(Biz Logic
    • 执行(Execute)任务
  • 状态(State
    • 运行中(Running〚1〛
    • 取消(Cancelled〚2〛
    • 完成(Completed / Done
      • 成功(Success / Successful)/ 正常完成(Completed Normally)/ 成功完成(Completed Successfully
      • 失败(Failed / Fail)/ 异常完成(Completed Exceptionally
  • 状态转变(Transition
    • 事件(Event)、触发(Trigger
  • 业务流程(Biz Flow)、CF链(Chain
    • 流程图(Flow Graph)、有向无环图 / DAG
      • 为什么构建的CF链一定是DAG
    • 流程编排(Flow Choreography
  • 前驱(Predecessor)/ 后继(Successor
    • 上游任务 / 前驱任务 / Dependency Task(我依赖的任务)
    • 下游任务 / 后继任务 / Dependent Task(依赖我的任务)

注:上面用/隔开的多个词是,在表述CF同一个概念时,会碰到的多个术语;在不影响理解的情况下,后文会尽量统一用第一个词来表达。

task stauts transition

更多说明:

  • 〚1〛 任务状态有且有只有 运行中(Running)、取消(Cancelled)、完成(Completed)这3种状态。
    • 对于「完成」状态,进一步可以分成 成功(Success)、失败(Failed)2种状态。
  • 所以也可以说,任务状态有且只有 运行中、取消、成功、失败 这4种状态。
    • 右图是任务的状态及其转变图。
    • 在概念上CF的状态转变只能是单次单向的,这很简单可靠、也容易理解并和使用直觉一致。
    • 注:虽然下文提到的obtrudeValue()/obtrudeException方法可以突破CF概念上的约定,但这2个后门方法在正常设计实现中不应该会用到,尤其在业务使用应该完全忽略;带来的问题也由使用者自己了解清楚并注意。

  • 〚2〛 关于「取消」状态:
  • 对于「取消」状态,或说设置了「CancellationException」失败异常的CompletableFuture cf,相比其它异常失败 / 设置了其它失败异常 的情况,不一样的地方:
    • 调用cf.get() / cf.get(timeout, unit)方法
      • 会抛出CancellationException异常
      • 其它异常失败时,这2个方法抛出的是包了一层的ExecutionExceptioncause是实际的失败异常
    • 调用cf.join() / cf.getNow(valueIfAbsent)方法
      • 会抛出CancellationException异常
      • 其它异常失败时,这2个方法抛出的是包了一层的CompletionExceptioncause是实际的失败异常
    • 调用cf.exceptionNow()方法
      • 会抛出IllegalStateException,而不是返回cf所设置的CancellationException异常
      • 其它异常失败时,exceptionNow()返回设置的异常
    • 调用cf.isCancelled()方法
      • 返回true
      • 其它异常失败时,isCancelled()返回false
  • 其它地方,CancellationException异常与其它异常是一样处理的。比如:
    • 调用cf.resultNow()方法
      都是抛出IllegalStateException异常
    • 调用cf.isDone()cf.isCompletedExceptionally()
      都是返回true
    • CompletionStage接口方法对异常的处理,如
      cf.exceptionally()的方法参数Function<Throwable, T>所处理的都是直接设置的异常对象没有包装过

🕹️ CF并发执行的关注方面

CF任务执行/流程编排,即执行提交的代码逻辑/计算/任务,涉及下面4个方面:

  • 任务的输入输出
    • CF所关联任务的输入参数/返回结果(及其数据类型)
  • 任务的调度,即在哪个线程来执行任务。可以是
    • 在触发的线程中就地连续执行任务
    • 在指定Executor(的线程)中执行任务
  • 任务的错误处理(任务运行出错)
  • 任务的超时控制
    • 超时控制是并发的基础关注方面之一
    • 到了Java 9提供了内置支持,新增了completeOnTimeout(...)/orTimeout(...)方法

本节「并发关注方面」,会举例上一些CF方法名,以说明CF方法的命名模式;
可以先不用关心方法的具体功能,在「CF的功能介绍」中会分类展开说明CF方法及其功能。

1. 输入输出

对应下面4种情况:

  • 无输入无返回(00)
    • 对应Runnable接口(包含单个run方法)
  • 无输入有返回(01)
    • 对应Supplier<O>接口(包含单个supply方法)
  • 有输入无返回(10)
    • 对应Consumer<I>接口(包含单个accept方法)
  • 有输入有返回(11)

注:

  • 对于有输入或返回的接口(即除了Runnable接口)
    • 都是泛型的,所以可以支持不同的具体数据类型
    • 都是处理单个输入数据
    • 如果要处理两个输入数据,即有两个上游CF的返回,会涉及下面的变体接口
  • 对于有输入接口,有两个输入参数的变体接口:

CF通过其方法名中包含的用词来体现:

  • run:无输入无返回(00)
    • 即是Runnable接口包含的run方法名
    • 相应的CF方法名的一些例子:
      • runAsync(Runnable runnable)
      • thenRun(Runnable action)
      • runAfterBoth(CompletionStage<?> other, Runnable action)
      • runAfterEitherAsync(CompletionStage<?> other, Runnable action)
  • supply:无输入有返回(01)
    • 即是Supplier接口包含的supply方法名
    • 相应的CF方法名的一些例子:
      • supplyAsync(Supplier<U> supplier)
      • supplyAsync(Supplier<U> supplier, Executor executor)
  • accept:有输入无返回(10)
    • 即是Consumer接口包含的accept方法名
    • 相应的CF方法名的一些例子:
      • thenAccept(Consumer<T> action)
      • thenAcceptAsync(Consumer<T> action)
      • thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action)
      • acceptEitherAsync(CompletionStage<T> other, Consumer<T> action)
  • apply:有输入有返回(11)
    • 即是Function接口包含的apply方法名。CF的方法如
    • 相应的CF方法名的一些例子:
      • thenApply(Function<T, U> fn)
      • thenApplyAsync(Function<T, U> fn)
      • applyToEither(CompletionStage<T> other, Function<T, U> fn)

2. 调度

任务调度是指,任务在哪个线程执行。有2种方式:

  • 在触发的线程中就地连续执行任务
  • 在指定Executor(的线程)中执行任务

CF通过方法名后缀Async来体现调度方式:

  • 有方法名后缀Async
    • 在触发CF后,任务在指定Executor执行
      • 如果不指定executor参数,缺省是ForkJoinPool.commonPool()
    • 相应的CF方法名的一些例子:
      • runAsync(Runnable runnable)
      • thenAcceptAsync(Consumer<T> action, Executor executor)
      • runAfterBothAsync(CompletionStage<?> other, Runnable action)
  • 无方法名后缀Async
    • 任务在触发线程就地连续执行
    • 相应的CF方法名的一些例子:
      • thenAccept(Consumer<T> action)
      • thenApply(Function<T, U> fn)
      • applyToEither(CompletionStage<T> other, Function<T, U> fn)

3. 错误处理

提交给CF的任务可以运行出错(抛出异常),即状态是失败(Failed)或取消(Cancelled)。

对于直接读取结果的方法:

  • 读取 成功结果的方法,如 cf.get()cf.join()会抛出异常(包装的异常)来反馈
  • 读取 失败结果的方法,如 cf.exceptionNow()会返回结果异常或是抛出异常来反馈

对于CompletionStage接口中编排执行的方法,会根据方法的功能 是只处理成功结果或失败结果一者,或是同时处理成功失败结果二者。如

  • exceptionally(...)只处理 失败结果
  • whenComplete(...)/handle(...)同时处理 成功与失败结果;
    • 这2个方法的参数lamdbaBiConsumer/BiFunction)同时输入成功失败结果2个参数:valueexception
  • 其它多数的方法只处理 成功结果
  • 对于不处理的结果,效果上就好像
    没有调用这个CompletionStage方法一样,即短路bypass了 👏

4. 任务执行的超时控制

超时控制是并发的基础关注方面之一。

到了Java 9提供了内置支持,新增了completeOnTimeout(...)/orTimeout(...)方法。

CF的超时控制,在实现上其实可以看成是CF的使用方式,并不是CF要实现基础能力;即可以通过其它已有的CF功能,在CF外围实现。

🔧 CF的功能介绍 | 💪 CF方法分类说明

见子文档页 cf-functions-intro.md

CF的方法个数比较多,所以介绍内容有些多,内容继续完善中… 💪 💕

📐 CF的设计模式 | 🐻 最佳实践与使用陷阱

见子文档页 cf-design-patterns.md

还没有什么内容,收集思考展开中… 💪 💕

📦 二、cffu

🔧 功能

新功能

  • 支持设置缺省的业务线程池
    • CompletableFuture的缺省线程池是ForkJoinPool.commonPool(),这个线程池差不多CPU个线程,合适执行CPU密集的任务。
    • 对于业务逻辑往往有很多等待操作(如网络IO、阻塞等待),并不是CPU密集的;使用这个缺省线程池ForkJoinPool.commonPool()很危险❗️
      所以每次调用CompletableFuture*async方法时,都传入业务线程池,很繁琐易错 🤯
    • Cffu支持设置缺省的业务线程池,规避上面的繁琐与危险
  • 一等公民支持Kotlin 🍩
  • cffuAllOf方法
    • 运行多个CompletableFuture并返回结果的allOf方法
  • cffuAnyOf方法
    • 返回具体类型的anyOf方法
  • cffuCombine(...)方法
    • 运行多个(2 ~ 5个)不同类型的CompletableFuture,返回结果元组
  • cffuJoin(timeout, unit)方法
    • 支持超时的join的方法;就像cf.get(timeout, unit) 之于 cf.get()
    • CompletableFuture缺少这个功能,cf.join()会「不超时永远等待」很危险❗️

Backport支持Java 8

BackportJava 9+高版本的所有CompletableFuture新功能,在Java 8可以直接使用。

其中重要的Backport功能有:

  • 超时控制:orTimeout(...)/completeOnTimeout(...)方法
  • 延迟执行:defaultExecutor(...)方法
  • 工厂方法:failedFuture(...)/completedStage(...)/failedStage(...)

🌿 业务使用中CompletableFuture所缺失的功能介绍

  • 运行多个CompletableFuture并返回结果的allOf方法:
    • resultAllOf方法,运行多个相同结果类型的CompletableFuture
      • CompletableFuture<List<T>> resultAllOf(CompletableFuture<T>... cfs)
      • CompletableFuture<List<T>> resultAllOf(List<? extends CompletableFuture<T>> cfs)
    • resultOf方法,运行多个不同结果类型的CompletableFuture
      • CompletableFuture<Pair<T1, T2>> resultOf(CompletableFuture<T1> cf1, CompletableFuture<T2> cf2)
      • CompletableFuture<Triple<T1, T2, T3>> resultOf(CompletableFuture<T1> cf1, CompletableFuture<T2> cf2, CompletableFuture<T3> cf3)
  • 具体类型的anyOf方法:
    • 提供的方法:
      • CompletableFuture<T> anyOf(CompletableFuture<T>... cfs)
      • CompletableFuture<T> anyOf(List<? extends CompletableFuture<T>> cfs)
    • CF返回的类型是Object,丢失具体类型:
      • CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

实现所在的类:

🎪 使用示例

import io.foldright.cffu.Cffu;
import io.foldright.cffu.CffuFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.foldright.cffu.CffuFactoryBuilder.newCffuFactoryBuilder;


public class Demo {
  private static final ExecutorService myBizThreadPool = Executors.newFixedThreadPool(42);

  // Create a CffuFactory with configuration of the customized thread pool
  private static final CffuFactory cffuFactory = newCffuFactoryBuilder(myBizThreadPool).build();

  public static void main(String[] args) throws Exception {
    // Run in myBizThreadPool
    Cffu<Integer> cf0 = cffuFactory.supplyAsync(() -> 21);

    Cffu<Integer> cf42 = cf0.thenApply(n -> n * 2);

    // Run in myBizThreadPool
    Cffu<Integer> longTaskA = cf42.thenApplyAsync(n -> {
      sleep(1001);
      return n / 2;
    });

    // Run in myBizThreadPool
    Cffu<Integer> longTaskB = cf42.thenApplyAsync(n -> {
      sleep(1002);
      return n / 2;
    });


    Cffu<Integer> finalCf = longTaskA.thenCombine(longTaskB, Integer::sum);

    Integer result = finalCf.get();
    System.out.println(result);

    ////////////////////////////////////////
    // cleanup
    myBizThreadPool.shutdown();
  }

  static void sleep(long ms) {
    try {
      Thread.sleep(ms);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

# 完整可运行的Demo代码参见Demo.java

🔌 Java API Docs

当前版本的Java API文档地址: https://foldright.io/cffu/apidocs/

🍪依赖

For Maven projects:

<dependency>
  <groupId>io.foldright</groupId>
  <artifactId>cffu</artifactId>
  <version>0.9.1</version>
</dependency>

For Gradle projects:

// Gradle Kotlin DSL
implementation("io.foldright:cffu:0.9.1")

// Gradle Groovy DSL
implementation 'io.foldright:cffu:0.9.1'

可以在 central.sonatype.com 查看最新版本与可用版本列表。

👋 ∞、关于库名

cffuCompletableFuture-Fu的缩写;读作C Fu,谐音Shifu/师傅

嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝

shifu

cffu's People

Contributors

dependabot[bot] avatar oldratlee avatar

Stargazers

 avatar

Watchers

 avatar

Forkers

lypjackson

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.