Callable
理论
(1)callable原理分析
Future 接口
当 call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。为此,可以使用 Future 对象。 将 Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦Callable 返回)。Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式。要实现此接口,必须重写 5 种方法,这里列出了重要的方法,如下: • public boolean cancel(boolean mayInterrupt):用于停止任务。如果尚未启动,它将停止任务。如果已启动,则仅在mayInterrupt 为 true时才会中断任务。 • public Object get()抛出 InterruptedException,ExecutionException:用于获取任务的结果。如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果。
• public boolean isDone():如果任务完成,则返回 true,否则返回 false可以看到 Callable 和 Future 做两件事-Callable 与 Runnable 类似,因为它封装了要在另一个线程上运行的任务,而 Future 用于存储从另一个线程获得的结果。实际上,future 也可以与 Runnable 一起使用。要创建线程,需要 Runnable。为了获得结果,需要 future。
FutureTask
Java 库具有具体的 FutureTask 类型,该类型实现 Runnable 和 Future,并方便地将两种功能组合在一起。 可以通过为其构造函数提供 Callable 来创建FutureTask。然后,将 FutureTask 对象提供给 Thread 的构造函数以创建Thread 对象。因此,间接地使用 Callable 创建线程。
(2)callable应用场景
• 在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成, 当主线程将来需要时,就可以通过 Future对象获得后台作业的计算结果或者执行状态 • 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果 • 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。 • 只计算一次
(3)核心原理:
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成 • 当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执行状态 • 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。 • 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法 • 一旦计算完成,就不能再重新开始或取消计算 • get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常 • get 只计算一次,因此 get 方法放到最后
语法
与Runnable接口类似,但Callable接口的call()方法可以返回结果或抛出异常。可以通过FutureTask类包装Callable对象,然后传递给Thread类进行启动。
class MyCallable implements Callable<Integer> {
public Integer call() {
// 线程执行的逻辑
return result;
}
}
FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();
FutureTask
1、理论
- 定义与功能:
- FutureTask是Java并发编程中的一个类,结合了Future和Runnable接口,用于表示一个异步计算的结果。
- 它允许将耗时的计算任务提交给另一个线程去执行,而当前线程可以继续执行其他任务,从而实现并发执行的效果。
- FutureTask主要解决异步执行、获取计算结果、异常处理以及任务取消等问题。
- 状态管理:
- FutureTask内部维护了一个状态变量(如state),用于表示任务的不同状态,如未启动、运行中、已完成等。
- 状态变量的变化通过CAS(Compare-and-Swap)等原子操作来保证线程安全。
- 任务执行:
- FutureTask通常与Callable或Runnable接口一起使用。当使用Callable时,可以获取到任务执行的结果;当使用Runnable时,则无法获取结果。
- FutureTask实现了Runnable接口,因此可以直接提交给线程池执行,或者由调用线程直接执行(FutureTask.run())。
- 结果获取:
- 通过调用Future.get()方法可以获取异步计算的结果。如果计算还没有完成,get()方法会阻塞调用线程,直到计算完成为止。
- 如果异步计算抛出了异常,那么调用get()方法时会抛出ExecutionException,从而可以方便地处理异常。
- 任务取消:
- 通过Future.cancel()方法可以取消还没有开始或者正在进行的异步计算。
- 取消操作的结果取决于任务是否已经启动、是否支持取消以及是否已经执行完成。
- 内部实现:
- FutureTask的实现涉及到了并发编程中的多个关键概念,如原子操作、状态管理、阻塞队列等。
- 它通过内部维护的状态变量和等待/通知机制来协调生产者和消费者线程之间的交互。
- 使用场景:
- 当需要异步执行某个计算任务,并且需要获取任务执行的结果时,可以使用FutureTask。
- 它可以提高程序的响应性,使得在等待耗时任务完成的同时,可以继续执行其他任务。
2、使用
public class CompletableFutureDemo
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
FutureTask<String> futureTask = new FutureTask<>(new MyThread());
Thread t1 = new Thread(futureTask,"t1");
t1.start();
try { TimeUnit.MILLISECONDS.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(futureTask.get());
}
}
class MyThread implements Callable<String>
{
@Override
public String call() throws Exception
{
System.out.println("-----come in call() " );
try { TimeUnit.MILLISECONDS.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
return "hello Callable";
}
}
3、优缺点
优点:Future+线程池异步多线程任务配合,能显著提高程序的运行效率。
缺点:
- get()阻塞---一旦调用get()方法求结果,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,如果没有计算完成容易程序堵塞。
- isDone()轮询---轮询的方式会耗费无谓的cpu资源,而且也不见得能及时得到计算结果,如果想要异步获取结果,通常会以轮询的方式去获取结果,尽量不要阻塞。
结论:Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
对于简单的业务场景使用Future完全ok
回调通知:
- 应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
- 通过轮询的方式去判断任务是否完成这样非常占cpu并且代码也不优雅
创建异步任务:Future+线程池组合
-
多个任务前后依赖可以组合处理(水煮鱼--->买鱼--->调料--->下锅):
-
想将多个异步任务的结果组合起来,后一个异步任务的计算结果需要钱一个异步任务的值
-
想将两个或多个异步计算合并成为一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果
-
对计算速度选最快的:
- 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果
结论:
- 使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求。
- 从i到i++
- Future能干的,CompletableFuture都能干
CompletableFuture(异步回调)
理论
Future
Future是Java5新加的一个接口,它提供一种异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们会就可以通过Future把这个任务放进异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future 缺点
(1)不支持手动完成 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成 (2)不支持进一步的非阻塞调用 通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能 (3)不支持链式调用 对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的。 (4)不支持多个 Future 合并 比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后,执行某些函数,是没法通过 Future 实现的。 (5)不支持异常处理 Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。
CompletionStage
用于异步执行中的阶段处理。这个接口定义了一组方法,这些方法使得在一个阶段执行结束后,可以继续执行下一个阶段,或者对结果进行转换以产生新的结果。
具体来说,CompletionStage的接口方法可以分为几类:
基于结果的转换:
apply
和combine
方法允许将上一阶段的结果作为参数传递给指定的函数,并返回新的结果。这些方法在Function或BiFunction类型的接口参数下工作。
基于结果的消费:
accept
方法允许将上一阶段的结果作为参数传递给指定的函数,但不返回结果。这种方法在Consumer或BiConsumer类型的接口参数下工作。
不依赖结果的执行:
run
方法允许在上一阶段执行结束后,执行指定的操作,但不依赖于一阶段的结果。这个方法的接口参数为Runnable类型。
异步执行:
async
方法允许异步执行阶段任务,可以选择指定线程池或不指定。
组合多个阶段:
both
方法表示当前两个阶段都完成后才执行下一阶段。either
方法表示当前两个阶段中任一完成后就执行下一阶段。compose
方法基于上阶段的执行完状态,执行下一阶段。
异常处理:
handle
和exceptionally
方法允许基于上阶段的执行完状态和结果,消费其正常或异常结果。
CompletionStage的实现类,如CompletableFuture,扩展了这个接口,提供了更多的功能和灵活性。通过使用CompletionStage和它的实现类,你可以更轻松地创建复杂的异步执行流程和任务组合。
总之,CompletionStage是Java 8中用于处理异步执行阶段和结果的重要接口,它提供了一组强大的方法来定义和管理异步任务之间的关系和转换。
CompletableFuture 简介
CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类。
CompletableFuture对Future的改进
get()方法在Future计算完成之前会一直处在阻塞状态下,阻塞的方式和异步编程的设计理念相违背。
isDene()方法容易耗费cpu资源(cpu空转)
对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果
jdk8设计出CompletableFuture,CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
CompletableFuture和CompletionStage
接口CompletionStage
- 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
类CompletableFuture
- 提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法
- 它可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作
语法
Java 8 引入的一个类,用于支持异步编程和构建异步操作的结果。它提供了一种简洁的方式来处理异步任务,并且能够方便地进行任务之间的组合、串行执行、并行执行以及异常处理。
创建 CompletableFuture:
static CompletableFuture<Void> runAsync(Runnable runnable)
:异步执行一个没有返回值的任务。static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
:异步执行一个有返回值的任务。
处理异步结果:
CompletableFuture<T> thenApply(Function<? super T,? extends U> fn)
:当上一个任务完成时,对其结果执行给定的函数。CompletableFuture<Void> thenAccept(Consumer<? super T> action)
:当上一个任务完成时,对其结果执行给定的操作,但不返回任何结果。CompletableFuture<Void> thenRun(Runnable action)
:当上一个任务完成时,执行给定的操作,但不接收上一个任务的结果。CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
:当上一个任务完成时,对其结果执行给定的函数,返回一个新的CompletableFuture
。
组合多个 CompletableFuture:
CompletableFuture<U> thenCombine(CompletionStage<? extends V> other, BiFunction<? super T,? super U,? extends V> fn)
:当当前任务和另一个任务都完成时,对它们的结果执行给定的函数。CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
:当当前任务和另一个任务都完成时,对它们的结果执行给定的操作,但不返回任何结果。CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
:当当前任务和另一个任务都完成时,执行给定的操作,但不接收任何结果。CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
:当任意一个任务完成时,对其结果执行给定的操作,但不返回任何结果。CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
:当任意一个任务完成时,执行给定的操作,但不接收任何结果。
异常处理:
CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
:当异常发生时,执行给定的函数。CompletableFuture<T> handle(BiFunction<? super T,Throwable,? extends T> fn)
:当任务完成时,对其结果执行给定的函数,或者在出现异常时执行另一个函数。
等待所有任务完成:
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
:等待所有CompletableFuture
都完成后返回一个新的CompletableFuture
。
等待任意任务完成:
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
:等待任意一个CompletableFuture
完成后返回一个新的CompletableFuture
。
取消任务:
boolean cancel(boolean mayInterruptIfRunning)
:尝试取消任务的执行。
创建方法
基本使用
//同步调用
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+" : CompletableFuture1");
});
completableFuture1.get();
//mq消息队列
//异步调用
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+" : CompletableFuture2");
//模拟异常
int i = 10/0;
return 1024;
});
completableFuture2.whenComplete((t,u)->{
System.out.println("------t="+t);
System.out.println("------u="+u);
}).get();
runAsync
方法用于异步地执行一个不返回结果的Runnable
任务。
runAsync(Runnable runnable)
- 接收一个
Runnable
对象作为参数。 - 使用
ForkJoinPool.commonPool()
作为默认的线程池来异步执行这个任务。 - 返回一个
CompletableFuture<Void>
,因为Runnable
不返回任何值(即返回void
)。
runAsync(Runnable runnable, Executor executor)
- 除了接收一个
Runnable
对象作为参数外,还接收一个Executor
对象来指定执行任务的线程池。 - 使用指定的
Executor
来异步执行这个任务。 - 同样返回一个
CompletableFuture<Void>
。
supplyAsync
方法用于异步地执行一个返回结果的Supplier
任务。
supplyAsync(Supplier<U> supplier)
- 接收一个
Supplier
对象作为参数,Supplier
是一个函数式接口,它的get()
方法返回一个结果。 - 使用
ForkJoinPool.commonPool()
作为默认的线程池来异步执行这个任务。 - 返回一个
CompletableFuture<U>
,其中U
是Supplier
的get()
方法返回的类型。
supplyAsync(Supplier<U> supplier, Executor executor)
- 除了接收一个
Supplier
对象作为参数外,还接收一个Executor
对象来指定执行任务的线程池。 - 使用指定的
Executor
来异步执行这个任务。 - 返回一个
CompletableFuture<U>
。
对于上述Executor参数说明:若没有指定,则使用默认的ForkJoinPoolcommonPool()作为它的线程池执行异步代码,如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码
public class CompletableFutureBuildDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
},executorService);
System.out.println(completableFuture.get()); //null
CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello supplyAsync";
},executorService);
System.out.println(objectCompletableFuture.get());//hello supplyAsync
executorService.shutdown();
}
}
CompletableFuture减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
public class CompletableFutureUseDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "---come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (result > 5) { //模拟产生异常情况
int i = 10 / 0;
}
System.out.println("----------1秒钟后出结果" + result);
return result;
}, executorService).whenComplete((v, e) -> {
if (e == null) {
System.out.println("计算完成 更新系统" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + " " + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "先去完成其他任务");
executorService.shutdown();
}
}
/**
* 无异常情况
* pool-1-thread-1---come in
* main先去完成其他任务
* ----------1秒钟后出结果9
* 计算完成 更新系统9
*/
/**
* 有异常情况
*pool-1-thread-1---come in
* main先去完成其他任务
* java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
* 异常情况:java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero
*/
CompletableFuture优点:
- 异步任务结束时,会自动回调某个对象的方法
- 主线程设置好回调后,不用关心异步任务的执行,异步任务之间可以顺序执行
- 异步任务出错时,会自动回调某个对象的方法
案例(电商网站比价)
电商网站比价需求分析:
-
需求说明:
-
同一款产品,同时搜索出同款产品在各大电商平台的售价
-
同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
-
输出返回:
-
出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List
例如:《Mysql》 in jd price is 88.05 《Mysql》 in taobao price is 90.43
-
解决方案,对比同一个产品在各个平台上的价格,要求获得一个清单列表
-
step by step,按部就班,查完淘宝查京东,查完京东查天猫....
-
all in,万箭齐发,一口气多线程异步任务同时查询
package com.bilibili.juc.cf;
import lombok.*;
import lombok.experimental.Accessors;
import java.awt.print.Book;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
*
* 案例说明:电商比价需求,模拟如下情况:
*
* 1需求:
* 1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
* 1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
*
* 2输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
* 《mysql》 in jd price is 88.05
* 《mysql》 in dangdang price is 86.11
* 《mysql》 in taobao price is 90.43
*
* 3 技术要求
* 3.1 函数式编程
* 3.2 链式编程
* 3.3 Stream流式计算
*/
public class CompletableFutureMallDemo
{
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("dangdang"),
new NetMall("taobao"),
new NetMall("pdd"),
new NetMall("tmall")
);
/**
* step by step 一家家搜查
* List<NetMall> ----->map------> List<String>
* @param list
* @param productName
* @return
*/
public static List<String> getPrice(List<NetMall> list,String productName)
{
//《mysql》 in taobao price is 90.43
return list
.stream()
.map(netMall ->
String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
/**
* List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
* @param list
* @param productName
* @return
*/
public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName)
{
return list.stream().map(netMall ->
CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s -> s.join())
.collect(Collectors.toList());
}
public static void main(String[] args)
{
long startTime = System.currentTimeMillis();
List<String> list1 = getPrice(list, "mysql");
for (String element : list1) {
System.out.println(element);
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
System.out.println("--------------------");
long startTime2 = System.currentTimeMillis();
List<String> list2 = getPriceByCompletableFuture(list, "mysql");
for (String element : list2) {
System.out.println(element);
}
long endTime2 = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
}
}
class NetMall
{
@Getter
private String netMallName;
public NetMall(String netMallName)
{
this.netMallName = netMallName;
}
public double calcPrice(String productName)
{
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
常用方法
获得结果和触发计算
-
获取结果
-
public T get()
-
public T get(long timeout,TimeUnit unit)
-
public T join() --->和get一样的作用,只是不需要抛出异常
-
public T getNow(T valuelfAbsent) --->计算完成就返回正常值,否则返回备胎值(传入的参数),立即获取结果不阻塞
-
-
主动触发计算
- public boolean complete(T value) ---->是否打断get方法立即返回括号值
对计算结果进行处理
- thenApply --->计算结果存在依赖关系,这两个线程串行化---->由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
- handle --->计算结果存在依赖关系,这两个线程串行化---->有异常也可以往下走一步
对计算结果进行消费
- 接受任务的处理结果,并消费处理,无返回结果
public class CompletableFutureApi2Demo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
return 1;
}, threadPool).thenApply(f -> {
return f + 2;
}).thenApply(f -> {
return f + 2;
}).thenAccept(r -> {
System.out.println(r);//5
});
}
}
- thenRun(Runnable runnable) :任务A执行完执行B,并且不需要A的结果
- thenAccept(Consumer action): 任务A执行完执行B,B需要A的结果,但是任务B没有返回值
- thenApply(Function fn): 任务A执行完执行B,B需要A的结果,同时任务B有返回值
public class CompletableFutureApi2Demo {
public static void main(String[] args) {
System.out.println(CompletableFuture.supplyAsync(() -> "result").thenRun(() -> {}).join());//null
System.out.println(CompletableFuture.supplyAsync(() -> "result").thenAccept(r -> System.out.println(r)).join());//result null
System.out.println(CompletableFuture.supplyAsync(() -> "result").thenApply(f -> f + 2).join());//result2
}
}
CompletableFuture和线程池说明
-
如果没有传入自定义线程池,都用默认线程池ForkJoinPool
-
传入一个线程池,如果你执行第一个任务时,传入了一个自定义线程池
-
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务时共用同一个线程池
-
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自定义的线程池,第二个任务使用的是ForkJoin线程池
-
-
备注:可能是线程处理太快,系统优化切换原则, 直接使用main线程处理,thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,之间的区别同理。
对计算速度进行选用
- 谁快用谁
- applyToEither
public class CompletableFutureApiDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("A come in");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playA";
}, threadPool);
CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("B come in");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playB";
}, threadPool);
CompletableFuture<String> result = playA.applyToEither(playB, f -> {
return f + " is winner";
});
/**
* A come in
* B come in
* main-----------winner:playA is winner
*/
System.out.println(Thread.currentThread().getName() + "-----------winner:" + result.join());
}
}
对计算结果进行合并
- 两个CompletableStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理
- 先完成的先等着,等待其他分支任务
public class CompletableFutureApi3Demo {
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 启动");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 启动");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> finalResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println("----------开始两个结果合并");
return x + y;
});
System.out.println(finalResult.join());
}
}