并发编程(十九):CompletableFuture

Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程,CompletableFuture 有可能是你见过的最复杂的工具类了,不过功能也着实让人感到震撼

1.CompletableFuture的核心优势

以下图烧水泡茶为例子:

image-20210818112811973
image-20210818112811973

其中线程T1执行洗水壶->烧开水的任务,线程T2执行洗茶壶->洗茶杯->拿茶叶的任务,T3等T1、T2执行完成后,执行泡茶的任务,代码实现如下:

下述代码你会发现,CompletableFuture的优点如下:

  1. 无需手工维护线程,也就是无需手动创建线程等;
  2. 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
  3. 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1: 洗水壶...");
  sleep(1, TimeUnit.SECONDS);
 
  System.out.println("T1: 烧开水...");
  sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2: 洗茶壶...");
  sleep(1, TimeUnit.SECONDS);
 
  System.out.println("T2: 洗茶杯...");
  sleep(2, TimeUnit.SECONDS);
 
  System.out.println("T2: 拿茶叶...");
  sleep(1, TimeUnit.SECONDS);
  return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1: 拿到茶叶:" + tf);
    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  });
// 等待任务 3 执行结果
System.out.println(f3.join());
 
void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井
Java

2.CompletabelFuture的使用

2.1.对象的创建

Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的

// 使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 可以指定线程池  
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)  
Java

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)

创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,因为CompletableFuture实现了Future接口,因此你可以轻易的知道线程什么时候执行完成,并且能够获取到执行结果。

另外,CompletableFuture 类还实现了 CompletionStage 接口

2.2.CompletionStage接口

CompletionStage接口可以清楚的描述任务之间的如下时序关系:串行关系、并行关系、汇聚关系前面提到的 f3 = f1.thenCombine(f2, ()->{}) 描述的就是一种汇聚关系,就是指某个任务需要等待其他一个或多个任务执行完成,才会执行,其中又包括AND汇聚关系、OR汇聚关系,其中AND代表要等待多个任务都完成,OR指只需其中某一个即可。其职责可总结如下:

  • 描述任务时序关系
  • 异常处理

2.2.1.描述串行关系

串行关系的描述主要是如下四个类型的接口: thenApply、thenAccept、thenRun 和 thenCompose

  • thenApply 系列方法入参fn是 Function<T, R>类型的接口,因此支持入参与返回值,所以 thenApply 系列方法返回的是CompletionStage<R>
  • thenAccept系列方法入参consumer是Consumer类型的接口,只支持入参不支持返回值,所以 thenAccept 系列方法返回的是CompletionStage<Void>
  • thenRun系列方法入参action是Runnable类型的接口,因此 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage<Void>
    • 方法里面 Async 代表的是异步执行 fn、consumer 或者 action,其中thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
Java

通过下面的示例代码,你可以看一下 thenApply() 方法是如何使用的

CompletableFuture<String> f0 = 
  CompletableFuture.supplyAsync(
    () -> "Hello World")      //①
  .thenApply(s -> s + " QQ")  //②
  .thenApply(String::toUpperCase);//③
 
System.out.println(f0.join());
// 输出结果
HELLO WORLD QQ

2.2.2. 描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。它们的使用你可以参考上面烧水泡茶的实现程序,这里就不赘述了。

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
Java

2.2.3. 描述 OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
Java

下面的示例代码展示了如何使用 applyToEither() 方法来描述一个 OR 汇聚关系。

CompletableFuture<String> f1 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});
 
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});
 
CompletableFuture<String> f3 = 
  f1.applyToEither(f2,s -> s);
 
System.out.println(f3.join());
Java

2.2.4. 异常处理

虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?

CompletableFuture<Integer> 
  f0 = CompletableFuture.
    .supplyAsync(()->(7/0))
    .thenApply(r->r*10);
System.out.println(f0.join());
Java

CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

CompletionStage exceptionally(fn);CompletionStage<R> whenComplete(consumer);CompletionStage<R> whenCompleteAsync(consumer);CompletionStage<R> handle(fn);CompletionStage<R> handleAsync(fn);
Java

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

CompletableFuture<Integer>   f0 = CompletableFuture    .supplyAsync(()->7/0))    .thenApply(r->r*10)    .exceptionally(e->0);System.out.println(f0.join());
Java

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×