Java并发-异步调用结果之Future和CompletableFuture
要执行异步任务,流程一般是让被调者立即返回,让它在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获得需要的数据。
在Java中,异步任务用Future
接口来表示。
# Future
先来看一个Future
用法的演示:
void future() throws ExecutionException, InterruptedException {
FutureTask<Double> futureTask = new FutureTask<>(() -> {
Thread.sleep(3000);
return Math.pow(10, 2);
});
Thread thread = new Thread(futureTask);
thread.start();
System.out.println(futureTask.get());
}
FutureTask
继承自RunnableFuture
,RunnableFuture
继承Runnable
和Future
。
Future
的主要方法:
- cancel。取消
- isCanceled。是否已取消
- isDone。是否已完成
- get。获得结果,会阻塞
- get(timeout)。带超时时间的获得结果,如果超时,则抛出异常
其实线程池的submit方法也使用了FutureTask
来对任务进行封装:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
# Future
增强版之CompletableFuture
Java 8开始引入了CompletableFuture
,它针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture
实现了CompletionStage
接口,从而实现了一个超大型的工具类。
先看一下基本的用法:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::fetchPrice);
cf.thenApply(d->d+10)
.thenApply(d->d*2)
.thenAccept((result) -> System.out.println("price: " + result))
.exceptionally((e) -> {
e.printStackTrace();
return null;
});
执行了一个异步任务,然后对他进行了加工(thenApply),对结果进行了接受(thenAccept),对异常进行了处理(exceptionally)。
CompletableFuture
中的方法众多,下面按照类别来进行介绍。
# 创建类
- completeFuture。可以用于创建默认返回值
- runAsync。异步执行,无返回值
- supplyAsync。异步执行,有返回值
- anyOf。任意一个执行完成,就可以进行下一步动作
- allOf。全部完成所有任务,才可以进行下一步任务
代码演示:
// 直接值
CompletableFuture<String> f1 = CompletableFuture.completedFuture("hello");
// 无返回值
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> System.out.println(1));
// 有返回值
CompletableFuture<Integer> f3 = CompletableFuture.supplyAsync(() -> 100);
// 并行执行,最早的一个任务完成即返回
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(f1, f2, f3);
// 并行执行,所有的任务完成才返回
CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f2, f3);
# 状态取值类
- join。等待合并结果
- get。合并等待结果,可以增加超时时间。get和join区别:join只会抛出unchecked异常,get会返回具体的异常
- getNow。如果结果计算完成或者异常了,则返回结果或异常;否则,返回给定的默认值
- isCancelled。是否已取消
- isCompletedExceptionally。是否执行异常
- isDone。是否执行完成
# 控制类
- complete。直接完成任务,让get可以直接获取到值
- completeExceptionally。直接抛出异常,结束任务执行
- cancel。取消任务
# 接续类
- thenApply, thenApplyAsync。对执行结果进行再包装
- thenAccept, thenAcceptAsync。对执行结果进行接受和利用
- thenRun, thenRunAsync。对执行结果下一步的执行
- thenCompose, thenComposeAsync。组合另一个任务
- whenComplete, whenCompleteAsync。阶段完成时(包括成功和异常),对结果进行处理
- handle, handleAsync。阶段完成时(包括成功和异常),对结果进行处理。和whenComplete的区别是它会再返回一个结果
- exceptionally。对异常结果进行处理
- thenCombine, thenCombineAsync。把任务和另一个任务进行组合,当两个任务都执行完成后,返回结果。有入参,有返回值
- thenAcceptBoth, thenAcceptBothAsync。把任务和另一个任务进行组合,当两个任务都执行完成后,返回结果。有入参,无返回值
- runAfterBoth, runAfterBothAsync。把任务和另一个任务进行组合,当两个任务都执行完成后,返回结果。无入参,无返回值
- applyToEither, applyToEitherAsync。把任务和另一个任务进行组合,有其中任何一个执行完成,返回结果。有入参,有返回值
- acceptEither, acceptEitherAsync。把任务和另一个任务进行组合,有其中任何一个执行完成,返回结果。有入参,无返回值
- runAfterEither, runAfterEitherAsync。把任务和另一个任务进行组合,有其中任何一个执行完成,返回结果。无入参,无返回值
接续类都提供了一个Async方法,比如thenCombine对应有一个thenCombineAsync,他们的区别在于执行合并操作用到哪个线程?
thenCombine 方法会在 future1 和 future2 都完成后,使用当前线程(即调用 thenCombine 的线程)来执行结果的合并操作。
thenCombineAsync 方法会在 future1 和 future2 都完成后,使用一个异步线程池(如 ForkJoinPool.commonPool())中的线程来执行结果的合并操作。
代码演示:
CompletableFuture<String> rice = CompletableFuture.supplyAsync(()->{
System.out.println("开始制作米饭,并获得煮熟的米饭");
return "煮熟的米饭";
});
rice.thenApply(r->"success");
rice.thenAccept(r->{});
rice.thenRun(()->{});
rice.thenCompose(r->CompletableFuture.completedFuture("continue"));
rice.whenComplete((s,t)->{});
rice.handle((s,t)->"final");
rice.exceptionally(t->"final");
//煮米饭的同时呢,我又做了牛奶
CompletableFuture mike = CompletableFuture.supplyAsync(()->{
System.out.println("开始热牛奶,并获得加热的牛奶");
return "加热的牛奶";
});
// 我想两个都好了,才吃早饭,thenCombineAsync有入参,有返回值
mike.thenCombineAsync(rice,(m,r)->{
System.out.println("我收获了早饭:"+m+","+r);
return String.valueOf(m) + r;
});
// 有入参,无返回值
mike.thenAcceptBothAsync(rice,(m,r)->{
System.out.println("我收获了早饭:"+m+","+r);
});
// 无入参,无返回值
mike.runAfterBothAsync(rice,()->{
System.out.println("我收获了早饭");
});
mike.applyToEither(rice,(r)-> "已完成一个任务");
mike.acceptEither(rice,r->{});
mike.runAfterEither(rice,()->{});
# 执行线程
默认情况下,CompletableFuture
的执行线程取决于你如何创建和调用它,但它的行为可以通过显式指定线程池来改变。
# 默认执行线程
CompletableFuture
的默认执行线程取决于它的创建方式:
使用
CompletableFuture.runAsync
或CompletableFuture.supplyAsync
:- 如果没有指定线程池,任务会提交到 ForkJoinPool.commonPool(),这是 Java 8 引入的一个全局共享的线程池。
ForkJoinPool.commonPool()
的线程数默认是 CPU 核心数减一(Runtime.getRuntime().availableProcessors() - 1
)。
直接创建
CompletableFuture
并手动完成:- 如果你直接创建
CompletableFuture
并调用complete
或completeExceptionally
,任务的执行线程由调用者决定。
- 如果你直接创建
# 改变执行线程
你可以通过显式指定线程池来改变 CompletableFuture
的执行线程。CompletableFuture
提供了以下方法支持自定义线程池:
runAsync(Runnable runnable, Executor executor)
:- 使用指定的线程池执行
Runnable
任务。
- 使用指定的线程池执行
supplyAsync(Supplier<U> supplier, Executor executor)
:- 使用指定的线程池执行
Supplier
任务。
- 使用指定的线程池执行
# 示例代码
以下示例展示了如何使用默认线程池和自定义线程池:
默认线程池:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("任务执行线程: " + Thread.currentThread().getName());
});
future.join(); // 等待任务完成
输出:
任务执行线程: ForkJoinPool.commonPool-worker-1
自定义线程池:
ExecutorService customExecutor = Executors.newFixedThreadPool(2);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("任务执行线程: " + Thread.currentThread().getName());
}, customExecutor);
future.join(); // 等待任务完成
customExecutor.shutdown(); // 关闭线程池
输出:
任务执行线程: pool-1-thread-1
# 线程池的选择
根据任务的特点,可以选择不同的线程池:
ForkJoinPool.commonPool()
:适合 CPU 密集型任务,默认线程数较少。Executors.newCachedThreadPool()
:适合短生命周期的任务,线程池会根据需要创建新线程。Executors.newFixedThreadPool(int nThreads)
:适合需要控制并发数的场景。- 自定义线程池:根据具体需求配置线程池参数。
注意事项
- 线程池管理:如果使用自定义线程池,记得在任务完成后关闭线程池(
shutdown()
或shutdownNow()
),以避免资源泄漏。 - 默认线程池的限制:
ForkJoinPool.commonPool()
的线程数有限,不适合 I/O 密集型任务或高并发场景。 - 异步链式调用:在
CompletableFuture
的链式调用中,可以使用thenRunAsync
、thenApplyAsync
等方法指定后续操作的线程池。
# 链式调用中的线程切换
在 CompletableFuture
的链式调用中,可以通过 *Async
方法显式切换线程池:
ExecutorService executor1 = Executors.newFixedThreadPool(2);
ExecutorService executor2 = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
System.out.println("任务 1 执行线程: " + Thread.currentThread().getName());
return "Hello";
}, executor1).thenApplyAsync(result -> {
System.out.println("任务 2 执行线程: " + Thread.currentThread().getName());
return result + " World";
}, executor2).thenAcceptAsync(finalResult -> {
System.out.println("任务 3 执行线程: " + Thread.currentThread().getName());
System.out.println("最终结果: " + finalResult);
}, executor1).join();
executor1.shutdown();
executor2.shutdown();
输出:
任务 1 执行线程: pool-1-thread-1
任务 2 执行线程: pool-2-thread-1
任务 3 执行线程: pool-1-thread-2
最终结果: Hello World
# CompletionService
想象浏览器渲染图片的场景:多个图片下载任务提交到线程池,最先下载成功的图片先展示。
根据上述的知识,使用线程池+Future好像实现不了最先展示逻辑;使用CompletableFuture
好像也不符合实际。
这时候就需要CompletionService
登场了,它整合了Executor
和BlockingQueue
的功能,可以让我们获得最先执行完成的任务。
看一下实际的实例:
public static void main(String[] args) throws Exception {
int imageSize = 10;
ExecutorService service = Executors.newFixedThreadPool(imageSize);
CompletionService<String> completionService = new ExecutorCompletionService<String>(service);
for (int i = 0; i < imageSize; i++) {
completionService.submit(new DownloadImg(i));
}
renderText();
for (int i = 0; i < imageSize; i++) {
// 会一直获取到最新执行完成的任务结果
renderImg(completionService.take().get());
}
service.shutdown();
}
private static void renderImg(String s) { ... }
private static void renderText() { ... }
private static class DownloadImg implements Callable<String> { ... }
# 总结
本章主要讲了Future
和CompletableFuture
,他们是一脉相承,在主流的框架中都已经开始使用CompletableFuture来完成异步调用,它确实非常强大。
至于CompletionService
,他是给了我们另一种选择,在某些场景下能大大提高了开发的效率。
今天讨论和分享了异步结果调用的三个主要类,希望你有所收获!
祝你变得更强!