异步调用

2024-05-29  
0条评论   160浏览

Callable

理论

(1)callable原理分析

Future 接口

当 call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。为此,可以使用 Future 对象。 将 Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦Callable 返回)。Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式。要实现此接口,必须重写 5 种方法,这里列出了重要的方法,如下: • public boolean cancel(boolean mayInterrupt):用于停止任务。如果尚未启动,它将停止任务。如果已启动,则仅在mayInterrupt 为 true时才会中断任务。 • public Object get()抛出 InterruptedException,ExecutionException:用于获取任务的结果。如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果。

• public boolean isDone():如果任务完成,则返回 true,否则返回 false可以看到 Callable 和 Future 做两件事-Callable 与 Runnable 类似,因为它封装了要在另一个线程上运行的任务,而 Future 用于存储从另一个线程获得的结果。实际上,future 也可以与 Runnable 一起使用。要创建线程,需要 Runnable。为了获得结果,需要 future。

FutureTask

Java 库具有具体的 FutureTask 类型,该类型实现 Runnable 和 Future,并方便地将两种功能组合在一起。 可以通过为其构造函数提供 Callable 来创建FutureTask。然后,将 FutureTask 对象提供给 Thread 的构造函数以创建Thread 对象。因此,间接地使用 Callable 创建线程。

(2)callable应用场景

• 在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成, 当主线程将来需要时,就可以通过 Future对象获得后台作业的计算结果或者执行状态 • 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果 • 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。 • 只计算一次

(3)核心原理:

在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成 • 当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执行状态 • 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。 • 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法 • 一旦计算完成,就不能再重新开始或取消计算 • get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常 • get 只计算一次,因此 get 方法放到最后

语法

与Runnable接口类似,但Callable接口的call()方法可以返回结果或抛出异常。可以通过FutureTask类包装Callable对象,然后传递给Thread类进行启动。

class MyCallable implements Callable<Integer> {
    public Integer call() {
        // 线程执行的逻辑
        return result;
    }
}

FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();

FutureTask

1、理论

  1. 定义与功能:
    • FutureTask是Java并发编程中的一个类,结合了Future和Runnable接口,用于表示一个异步计算的结果。
    • 它允许将耗时的计算任务提交给另一个线程去执行,而当前线程可以继续执行其他任务,从而实现并发执行的效果。
    • FutureTask主要解决异步执行、获取计算结果、异常处理以及任务取消等问题。
  2. 状态管理:
    • FutureTask内部维护了一个状态变量(如state),用于表示任务的不同状态,如未启动、运行中、已完成等。
    • 状态变量的变化通过CAS(Compare-and-Swap)等原子操作来保证线程安全。
  3. 任务执行:
    • FutureTask通常与Callable或Runnable接口一起使用。当使用Callable时,可以获取到任务执行的结果;当使用Runnable时,则无法获取结果。
    • FutureTask实现了Runnable接口,因此可以直接提交给线程池执行,或者由调用线程直接执行(FutureTask.run())。
  4. 结果获取:
    • 通过调用Future.get()方法可以获取异步计算的结果。如果计算还没有完成,get()方法会阻塞调用线程,直到计算完成为止。
    • 如果异步计算抛出了异常,那么调用get()方法时会抛出ExecutionException,从而可以方便地处理异常。
  5. 任务取消:
    • 通过Future.cancel()方法可以取消还没有开始或者正在进行的异步计算。
    • 取消操作的结果取决于任务是否已经启动、是否支持取消以及是否已经执行完成。
  6. 内部实现:
    • FutureTask的实现涉及到了并发编程中的多个关键概念,如原子操作、状态管理、阻塞队列等。
    • 它通过内部维护的状态变量和等待/通知机制来协调生产者和消费者线程之间的交互。
  7. 使用场景:
    • 当需要异步执行某个计算任务,并且需要获取任务执行的结果时,可以使用FutureTask。
    • 它可以提高程序的响应性,使得在等待耗时任务完成的同时,可以继续执行其他任务。

2、使用

public class CompletableFutureDemo
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());

        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        try { TimeUnit.MILLISECONDS.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println(futureTask.get());
    }
}

class MyThread implements Callable<String>
{
    @Override
    public String call() throws Exception
    {

        System.out.println("-----come in call() " );
        try { TimeUnit.MILLISECONDS.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
        return "hello Callable";
    }
}

3、优缺点

优点:Future+线程池异步多线程任务配合,能显著提高程序的运行效率。

缺点:

  • get()阻塞---一旦调用get()方法求结果,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,如果没有计算完成容易程序堵塞。
  • isDone()轮询---轮询的方式会耗费无谓的cpu资源,而且也不见得能及时得到计算结果,如果想要异步获取结果,通常会以轮询的方式去获取结果,尽量不要阻塞。

结论:Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

对于简单的业务场景使用Future完全ok

回调通知:

  • 应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
  • 通过轮询的方式去判断任务是否完成这样非常占cpu并且代码也不优雅

创建异步任务:Future+线程池组合

  • 多个任务前后依赖可以组合处理(水煮鱼--->买鱼--->调料--->下锅):

    • 想将多个异步任务的结果组合起来,后一个异步任务的计算结果需要钱一个异步任务的值

    • 想将两个或多个异步计算合并成为一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果

对计算速度选最快的:

  • 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果

结论

  • 使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求。
  • 从i到i++
  • Future能干的,CompletableFuture都能干

CompletableFuture(异步回调)

理论

Future

Future是Java5新加的一个接口,它提供一种异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们会就可以通过Future把这个任务放进异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。

Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

Future 缺点

(1)不支持手动完成 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成 (2)不支持进一步的非阻塞调用 通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能 (3)不支持链式调用 对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的。 (4)不支持多个 Future 合并 比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后,执行某些函数,是没法通过 Future 实现的。 (5)不支持异常处理 Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。

CompletionStage

用于异步执行中的阶段处理。这个接口定义了一组方法,这些方法使得在一个阶段执行结束后,可以继续执行下一个阶段,或者对结果进行转换以产生新的结果。

具体来说,CompletionStage的接口方法可以分为几类:

基于结果的转换:

  • applycombine 方法允许将上一阶段的结果作为参数传递给指定的函数,并返回新的结果。这些方法在Function或BiFunction类型的接口参数下工作。

基于结果的消费:

  • accept 方法允许将上一阶段的结果作为参数传递给指定的函数,但不返回结果。这种方法在Consumer或BiConsumer类型的接口参数下工作。

不依赖结果的执行:

  • run 方法允许在上一阶段执行结束后,执行指定的操作,但不依赖于一阶段的结果。这个方法的接口参数为Runnable类型。

异步执行:

  • async 方法允许异步执行阶段任务,可以选择指定线程池或不指定。

组合多个阶段:

  • both 方法表示当前两个阶段都完成后才执行下一阶段。
  • either 方法表示当前两个阶段中任一完成后就执行下一阶段。
  • compose 方法基于上阶段的执行完状态,执行下一阶段。

异常处理:

  • handleexceptionally 方法允许基于上阶段的执行完状态和结果,消费其正常或异常结果。

CompletionStage的实现类,如CompletableFuture,扩展了这个接口,提供了更多的功能和灵活性。通过使用CompletionStage和它的实现类,你可以更轻松地创建复杂的异步执行流程和任务组合。

总之,CompletionStage是Java 8中用于处理异步执行阶段和结果的重要接口,它提供了一组强大的方法来定义和管理异步任务之间的关系和转换。

CompletableFuture 简介

CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类。

CompletableFuture对Future的改进

get()方法在Future计算完成之前会一直处在阻塞状态下,阻塞的方式和异步编程的设计理念相违背。

isDene()方法容易耗费cpu资源(cpu空转)

对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果

jdk8设计出CompletableFuture,CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

CompletableFuture和CompletionStage

接口CompletionStage

  • 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

类CompletableFuture

  • 提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法
  • 它可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作

语法

Java 8 引入的一个类,用于支持异步编程和构建异步操作的结果。它提供了一种简洁的方式来处理异步任务,并且能够方便地进行任务之间的组合、串行执行、并行执行以及异常处理。

创建 CompletableFuture:

  • static CompletableFuture<Void> runAsync(Runnable runnable):异步执行一个没有返回值的任务。
  • static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier):异步执行一个有返回值的任务。

处理异步结果:

  • CompletableFuture<T> thenApply(Function<? super T,? extends U> fn):当上一个任务完成时,对其结果执行给定的函数。
  • CompletableFuture<Void> thenAccept(Consumer<? super T> action):当上一个任务完成时,对其结果执行给定的操作,但不返回任何结果。
  • CompletableFuture<Void> thenRun(Runnable action):当上一个任务完成时,执行给定的操作,但不接收上一个任务的结果。
  • CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn):当上一个任务完成时,对其结果执行给定的函数,返回一个新的 CompletableFuture

组合多个 CompletableFuture:

  • CompletableFuture<U> thenCombine(CompletionStage<? extends V> other, BiFunction<? super T,? super U,? extends V> fn):当当前任务和另一个任务都完成时,对它们的结果执行给定的函数。
  • CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action):当当前任务和另一个任务都完成时,对它们的结果执行给定的操作,但不返回任何结果。
  • CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action):当当前任务和另一个任务都完成时,执行给定的操作,但不接收任何结果。
  • CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action):当任意一个任务完成时,对其结果执行给定的操作,但不返回任何结果。
  • CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action):当任意一个任务完成时,执行给定的操作,但不接收任何结果。

异常处理:

  • CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn):当异常发生时,执行给定的函数。
  • CompletableFuture<T> handle(BiFunction<? super T,Throwable,? extends T> fn):当任务完成时,对其结果执行给定的函数,或者在出现异常时执行另一个函数。

等待所有任务完成:

  • static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs):等待所有 CompletableFuture 都完成后返回一个新的 CompletableFuture

等待任意任务完成:

  • static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs):等待任意一个 CompletableFuture 完成后返回一个新的 CompletableFuture

取消任务:

  • boolean cancel(boolean mayInterruptIfRunning):尝试取消任务的执行。

创建方法

基本使用

//同步调用
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
    System.out.println(Thread.currentThread().getName()+" : CompletableFuture1");
});
completableFuture1.get();

//mq消息队列
//异步调用
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName()+" : CompletableFuture2");
    //模拟异常
    int i = 10/0;
    return 1024;
});
completableFuture2.whenComplete((t,u)->{
    System.out.println("------t="+t);
    System.out.println("------u="+u);
}).get();

runAsync

方法用于异步地执行一个不返回结果的Runnable任务。

runAsync(Runnable runnable)
  • 接收一个Runnable对象作为参数。
  • 使用ForkJoinPool.commonPool()作为默认的线程池来异步执行这个任务。
  • 返回一个CompletableFuture<Void>,因为Runnable不返回任何值(即返回void)。
runAsync(Runnable runnable, Executor executor)
  • 除了接收一个Runnable对象作为参数外,还接收一个Executor对象来指定执行任务的线程池。
  • 使用指定的Executor来异步执行这个任务。
  • 同样返回一个CompletableFuture<Void>

supplyAsync

方法用于异步地执行一个返回结果的Supplier任务。

supplyAsync(Supplier<U> supplier)
  • 接收一个Supplier对象作为参数,Supplier是一个函数式接口,它的get()方法返回一个结果。
  • 使用ForkJoinPool.commonPool()作为默认的线程池来异步执行这个任务。
  • 返回一个CompletableFuture<U>,其中USupplierget()方法返回的类型。
supplyAsync(Supplier<U> supplier, Executor executor)
  • 除了接收一个Supplier对象作为参数外,还接收一个Executor对象来指定执行任务的线程池。
  • 使用指定的Executor来异步执行这个任务。
  • 返回一个CompletableFuture<U>

对于上述Executor参数说明:若没有指定,则使用默认的ForkJoinPoolcommonPool()作为它的线程池执行异步代码,如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },executorService);

        System.out.println(completableFuture.get()); //null

        CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello supplyAsync";
        },executorService);

        System.out.println(objectCompletableFuture.get());//hello supplyAsync

        executorService.shutdown();

    }
}

CompletableFuture减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "---come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (result > 5) { //模拟产生异常情况
                int i = 10 / 0;
            }
            System.out.println("----------1秒钟后出结果" + result);
            return result;
        }, executorService).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("计算完成 更新系统" + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println("异常情况:" + e.getCause() + " " + e.getMessage());
            return null;
        });
        System.out.println(Thread.currentThread().getName() + "先去完成其他任务");
        executorService.shutdown();
    }
}

/**
 * 无异常情况
 * pool-1-thread-1---come in
 * main先去完成其他任务
 * ----------1秒钟后出结果9
 * 计算完成 更新系统9
 */

/**
 * 有异常情况
 *pool-1-thread-1---come in
 * main先去完成其他任务
 * java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
 * 异常情况:java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero
 */

CompletableFuture优点:

  • 异步任务结束时,会自动回调某个对象的方法
  • 主线程设置好回调后,不用关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法

案例(电商网站比价)

电商网站比价需求分析:

  1. 需求说明:

  2. 同一款产品,同时搜索出同款产品在各大电商平台的售价

  3. 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少

  4. 输出返回:

  5. 出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List

例如:《Mysql》 in jd price is 88.05 《Mysql》 in taobao price is 90.43

  1. 解决方案,对比同一个产品在各个平台上的价格,要求获得一个清单列表

  2. step by step,按部就班,查完淘宝查京东,查完京东查天猫....

  3. all in,万箭齐发,一口气多线程异步任务同时查询

package com.bilibili.juc.cf;

import lombok.*;
import lombok.experimental.Accessors;

import java.awt.print.Book;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 *
 * 案例说明:电商比价需求,模拟如下情况:
 *
 * 1需求:
 *  1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
 *  1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
 *
 * 2输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
 * 《mysql》 in jd price is 88.05
 * 《mysql》 in dangdang price is 86.11
 * 《mysql》 in taobao price is 90.43
 *
 * 3 技术要求
 *   3.1 函数式编程
 *   3.2 链式编程
 *   3.3 Stream流式计算
 */
public class CompletableFutureMallDemo
{
    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"),
            new NetMall("pdd"),
            new NetMall("tmall")
    );

    /**
     * step by step 一家家搜查
     * List<NetMall> ----->map------> List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPrice(List<NetMall> list,String productName)
    {
        //《mysql》 in taobao price is 90.43
        return list
                .stream()
                .map(netMall ->
                        String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }

    /**
     * List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName)
    {
        return list.stream().map(netMall ->
                CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
                netMall.getNetMallName(),
                netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(s -> s.join())
                .collect(Collectors.toList());
    }


    public static void main(String[] args)
    {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");

        System.out.println("--------------------");

        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(list, "mysql");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
    }
}

class NetMall
{
    @Getter
    private String netMallName;

    public NetMall(String netMallName)
    {
        this.netMallName = netMallName;
    }

    public double calcPrice(String productName)
    {
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

常用方法

获得结果和触发计算

  • 获取结果

    • public T get()

    • public T get(long timeout,TimeUnit unit)

    • public T join() --->和get一样的作用,只是不需要抛出异常

    • public T getNow(T valuelfAbsent) --->计算完成就返回正常值,否则返回备胎值(传入的参数),立即获取结果不阻塞

  • 主动触发计算

    • public boolean complete(T value) ---->是否打断get方法立即返回括号值

对计算结果进行处理

  • thenApply --->计算结果存在依赖关系,这两个线程串行化---->由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
  • handle --->计算结果存在依赖关系,这两个线程串行化---->有异常也可以往下走一步

对计算结果进行消费

  • 接受任务的处理结果,并消费处理,无返回结果
public class CompletableFutureApi2Demo {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }, threadPool).thenApply(f -> {
            return f + 2;
        }).thenApply(f -> {
            return f + 2;
        }).thenAccept(r -> {
            System.out.println(r);//5
        });
    }
}
  • thenRun(Runnable runnable) :任务A执行完执行B,并且不需要A的结果
  • thenAccept(Consumer action): 任务A执行完执行B,B需要A的结果,但是任务B没有返回值
  • thenApply(Function fn): 任务A执行完执行B,B需要A的结果,同时任务B有返回值
public class CompletableFutureApi2Demo {
    public static void main(String[] args) {
        System.out.println(CompletableFuture.supplyAsync(() -> "result").thenRun(() -> {}).join());//null
        System.out.println(CompletableFuture.supplyAsync(() -> "result").thenAccept(r -> System.out.println(r)).join());//result null
        System.out.println(CompletableFuture.supplyAsync(() -> "result").thenApply(f -> f + 2).join());//result2
    }
}

CompletableFuture和线程池说明

  • 如果没有传入自定义线程池,都用默认线程池ForkJoinPool

  • 传入一个线程池,如果你执行第一个任务时,传入了一个自定义线程池

    • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务时共用同一个线程池

    • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自定义的线程池,第二个任务使用的是ForkJoin线程池

  • 备注:可能是线程处理太快,系统优化切换原则, 直接使用main线程处理,thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,之间的区别同理。

对计算速度进行选用

  • 谁快用谁
  • applyToEither
public class CompletableFutureApiDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("A come in");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playA";
        }, threadPool);


        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("B come in");
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playB";
        }, threadPool);

        CompletableFuture<String> result = playA.applyToEither(playB, f -> {
            return f + " is winner";
        });

        /**
         * A come in
         * B come in
         * main-----------winner:playA is winner
         */
        System.out.println(Thread.currentThread().getName() + "-----------winner:" + result.join());
    }
}

对计算结果进行合并

  • 两个CompletableStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理
  • 先完成的先等着,等待其他分支任务
public class CompletableFutureApi3Demo {
    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " 启动");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " 启动");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 20;
        });

        CompletableFuture<Integer> finalResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println("----------开始两个结果合并");
            return x + y;
        });
        System.out.println(finalResult.join());

    }
}