轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Java

    • 核心

    • 并发

      • Java并发-线程基础与synchronized关键字
      • Java并发-重入锁ReetrantLock的使用
      • Java并发-信号量Semaphore
      • Java并发-读写锁ReadWriteLock
      • Java并发-倒计时器CountDownLatch
      • Java并发-栅栏CyclicBarrier
      • Java并发-LockSupport线程阻塞工具类
      • Java并发-线程池ThreadPoolExecutor
      • Java并发-阻塞队列BlockingQueue
      • Java并发-以空间换时间之ThreadLocal
      • Java并发-无锁策略CAS与atomic包
      • Java并发-JDK并发容器
      • Java并发-异步调用结果之Future和CompletableFuture
        • Future
        • Future增强版之CompletableFuture
          • 创建类
          • 状态取值类
          • 控制类
          • 接续类
          • 执行线程
          • 默认执行线程
          • 改变执行线程
          • 示例代码
          • 线程池的选择
          • 链式调用中的线程切换
        • CompletionService
        • 总结
      • Java并发-Fork Join框架
      • Java并发-调试与诊断
    • 经验

    • JVM

    • 企业应用

  • Spring

  • 其他语言

  • 工具

  • 后端
  • Java
  • 并发
轩辕李
2021-12-21
目录

Java并发-异步调用结果之Future和CompletableFuture

要执行异步任务,流程一般是让被调者立即返回,让它在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获得需要的数据。
在Java中,异步任务用Future接口来表示。

# Future

先来看一个Future用法的演示:

    void future() throws ExecutionException, InterruptedException {
        FutureTask<Double> futureTask = new FutureTask<>(() -> {
            Thread.sleep(3000);
            return Math.pow(10, 2);
        });
        Thread thread = new Thread(futureTask);
        thread.start();
        System.out.println(futureTask.get());
    }

FutureTask继承自RunnableFuture,RunnableFuture继承Runnable和Future。
Future的主要方法:

  • cancel。取消
  • isCanceled。是否已取消
  • isDone。是否已完成
  • get。获得结果,会阻塞
  • get(timeout)。带超时时间的获得结果,如果超时,则抛出异常

其实线程池的submit方法也使用了FutureTask来对任务进行封装:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

# Future增强版之CompletableFuture

Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture实现了CompletionStage接口,从而实现了一个超大型的工具类。
先看一下基本的用法:

        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::fetchPrice);
        cf.thenApply(d->d+10)
        .thenApply(d->d*2)
        .thenAccept((result) -> System.out.println("price: " + result))
        .exceptionally((e) -> {
            e.printStackTrace();
            return null;
        });

执行了一个异步任务,然后对他进行了加工(thenApply),对结果进行了接受(thenAccept),对异常进行了处理(exceptionally)。

CompletableFuture中的方法众多,下面按照类别来进行介绍。

# 创建类

  • completeFuture。可以用于创建默认返回值
  • runAsync。异步执行,无返回值
  • supplyAsync。异步执行,有返回值
  • anyOf。任意一个执行完成,就可以进行下一步动作
  • allOf。全部完成所有任务,才可以进行下一步任务

代码演示:

        // 直接值
        CompletableFuture<String> f1 = CompletableFuture.completedFuture("hello");
        // 无返回值
        CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> System.out.println(1));
        // 有返回值
        CompletableFuture<Integer> f3 = CompletableFuture.supplyAsync(() -> 100);
        // 并行执行,最早的一个任务完成即返回
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(f1, f2, f3);
        // 并行执行,所有的任务完成才返回
        CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f2, f3);

# 状态取值类

  • join。等待合并结果
  • get。合并等待结果,可以增加超时时间。get和join区别:join只会抛出unchecked异常,get会返回具体的异常
  • getNow。如果结果计算完成或者异常了,则返回结果或异常;否则,返回给定的默认值
  • isCancelled。是否已取消
  • isCompletedExceptionally。是否执行异常
  • isDone。是否执行完成

# 控制类

  • complete。直接完成任务,让get可以直接获取到值
  • completeExceptionally。直接抛出异常,结束任务执行
  • cancel。取消任务

# 接续类

  • thenApply, thenApplyAsync。对执行结果进行再包装
  • thenAccept, thenAcceptAsync。对执行结果进行接受和利用
  • thenRun, thenRunAsync。对执行结果下一步的执行
  • thenCompose, thenComposeAsync。组合另一个任务
  • whenComplete, whenCompleteAsync。阶段完成时(包括成功和异常),对结果进行处理
  • handle, handleAsync。阶段完成时(包括成功和异常),对结果进行处理。和whenComplete的区别是它会再返回一个结果
  • exceptionally。对异常结果进行处理
  • thenCombine, thenCombineAsync。把任务和另一个任务进行组合,当两个任务都执行完成后,返回结果。有入参,有返回值
  • thenAcceptBoth, thenAcceptBothAsync。把任务和另一个任务进行组合,当两个任务都执行完成后,返回结果。有入参,无返回值
  • runAfterBoth, runAfterBothAsync。把任务和另一个任务进行组合,当两个任务都执行完成后,返回结果。无入参,无返回值
  • applyToEither, applyToEitherAsync。把任务和另一个任务进行组合,有其中任何一个执行完成,返回结果。有入参,有返回值
  • acceptEither, acceptEitherAsync。把任务和另一个任务进行组合,有其中任何一个执行完成,返回结果。有入参,无返回值
  • runAfterEither, runAfterEitherAsync。把任务和另一个任务进行组合,有其中任何一个执行完成,返回结果。无入参,无返回值

接续类都提供了一个Async方法,比如thenCombine对应有一个thenCombineAsync,他们的区别在于执行合并操作用到哪个线程?
thenCombine 方法会在 future1 和 future2 都完成后,使用当前线程(即调用 thenCombine 的线程)来执行结果的合并操作。
thenCombineAsync 方法会在 future1 和 future2 都完成后,使用一个异步线程池(如 ForkJoinPool.commonPool())中的线程来执行结果的合并操作。

代码演示:

        CompletableFuture<String> rice = CompletableFuture.supplyAsync(()->{
            System.out.println("开始制作米饭,并获得煮熟的米饭");
            return "煮熟的米饭";
        });
        rice.thenApply(r->"success");
        rice.thenAccept(r->{});
        rice.thenRun(()->{});
        rice.thenCompose(r->CompletableFuture.completedFuture("continue"));
        rice.whenComplete((s,t)->{});
        rice.handle((s,t)->"final");
        rice.exceptionally(t->"final");
        
        //煮米饭的同时呢,我又做了牛奶
        CompletableFuture mike = CompletableFuture.supplyAsync(()->{
            System.out.println("开始热牛奶,并获得加热的牛奶");
            return "加热的牛奶";
        });
        // 我想两个都好了,才吃早饭,thenCombineAsync有入参,有返回值
        mike.thenCombineAsync(rice,(m,r)->{
            System.out.println("我收获了早饭:"+m+","+r);
            return String.valueOf(m) + r;
        });
        // 有入参,无返回值
        mike.thenAcceptBothAsync(rice,(m,r)->{
            System.out.println("我收获了早饭:"+m+","+r);
        });
        // 无入参,无返回值
        mike.runAfterBothAsync(rice,()->{
            System.out.println("我收获了早饭");
        });

        mike.applyToEither(rice,(r)-> "已完成一个任务");
        mike.acceptEither(rice,r->{});
        mike.runAfterEither(rice,()->{});

# 执行线程

默认情况下,CompletableFuture 的执行线程取决于你如何创建和调用它,但它的行为可以通过显式指定线程池来改变。

# 默认执行线程

CompletableFuture 的默认执行线程取决于它的创建方式:

  • 使用 CompletableFuture.runAsync 或 CompletableFuture.supplyAsync:

    • 如果没有指定线程池,任务会提交到 ForkJoinPool.commonPool(),这是 Java 8 引入的一个全局共享的线程池。
    • ForkJoinPool.commonPool() 的线程数默认是 CPU 核心数减一(Runtime.getRuntime().availableProcessors() - 1)。
  • 直接创建 CompletableFuture 并手动完成:

    • 如果你直接创建 CompletableFuture 并调用 complete 或 completeExceptionally,任务的执行线程由调用者决定。

# 改变执行线程

你可以通过显式指定线程池来改变 CompletableFuture 的执行线程。CompletableFuture 提供了以下方法支持自定义线程池:

  • runAsync(Runnable runnable, Executor executor):
    • 使用指定的线程池执行 Runnable 任务。
  • supplyAsync(Supplier<U> supplier, Executor executor):
    • 使用指定的线程池执行 Supplier 任务。

# 示例代码

以下示例展示了如何使用默认线程池和自定义线程池:

默认线程池:

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("任务执行线程: " + Thread.currentThread().getName());
});
future.join(); // 等待任务完成

输出:

任务执行线程: ForkJoinPool.commonPool-worker-1

自定义线程池:

ExecutorService customExecutor = Executors.newFixedThreadPool(2);

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("任务执行线程: " + Thread.currentThread().getName());
}, customExecutor);

future.join(); // 等待任务完成
customExecutor.shutdown(); // 关闭线程池

输出:

任务执行线程: pool-1-thread-1

# 线程池的选择

根据任务的特点,可以选择不同的线程池:

  • ForkJoinPool.commonPool():适合 CPU 密集型任务,默认线程数较少。
  • Executors.newCachedThreadPool():适合短生命周期的任务,线程池会根据需要创建新线程。
  • Executors.newFixedThreadPool(int nThreads):适合需要控制并发数的场景。
  • 自定义线程池:根据具体需求配置线程池参数。

注意事项

  • 线程池管理:如果使用自定义线程池,记得在任务完成后关闭线程池(shutdown() 或 shutdownNow()),以避免资源泄漏。
  • 默认线程池的限制:ForkJoinPool.commonPool() 的线程数有限,不适合 I/O 密集型任务或高并发场景。
  • 异步链式调用:在 CompletableFuture 的链式调用中,可以使用 thenRunAsync、thenApplyAsync 等方法指定后续操作的线程池。

# 链式调用中的线程切换

在 CompletableFuture 的链式调用中,可以通过 *Async 方法显式切换线程池:

ExecutorService executor1 = Executors.newFixedThreadPool(2);
ExecutorService executor2 = Executors.newFixedThreadPool(2);

CompletableFuture.supplyAsync(() -> {
    System.out.println("任务 1 执行线程: " + Thread.currentThread().getName());
    return "Hello";
}, executor1).thenApplyAsync(result -> {
    System.out.println("任务 2 执行线程: " + Thread.currentThread().getName());
    return result + " World";
}, executor2).thenAcceptAsync(finalResult -> {
    System.out.println("任务 3 执行线程: " + Thread.currentThread().getName());
    System.out.println("最终结果: " + finalResult);
}, executor1).join();

executor1.shutdown();
executor2.shutdown();

输出:

任务 1 执行线程: pool-1-thread-1
任务 2 执行线程: pool-2-thread-1
任务 3 执行线程: pool-1-thread-2
最终结果: Hello World

# CompletionService

想象浏览器渲染图片的场景:多个图片下载任务提交到线程池,最先下载成功的图片先展示。
根据上述的知识,使用线程池+Future好像实现不了最先展示逻辑;使用CompletableFuture好像也不符合实际。
这时候就需要CompletionService登场了,它整合了Executor和BlockingQueue的功能,可以让我们获得最先执行完成的任务。

看一下实际的实例:

        public static void main(String[] args) throws Exception {
            int imageSize = 10;
            ExecutorService service = Executors.newFixedThreadPool(imageSize);
            CompletionService<String> completionService = new ExecutorCompletionService<String>(service);
            for (int i = 0; i < imageSize; i++) {
                completionService.submit(new DownloadImg(i));
            }
            renderText();
            for (int i = 0; i < imageSize; i++) {
                // 会一直获取到最新执行完成的任务结果
                renderImg(completionService.take().get());
            }
            service.shutdown();
        }

        private static void renderImg(String s) { ... }

        private static void renderText() { ... }

        private static class DownloadImg implements Callable<String> { ... }

# 总结

本章主要讲了Future和CompletableFuture,他们是一脉相承,在主流的框架中都已经开始使用CompletableFuture来完成异步调用,它确实非常强大。
至于CompletionService,他是给了我们另一种选择,在某些场景下能大大提高了开发的效率。
今天讨论和分享了异步结果调用的三个主要类,希望你有所收获!

祝你变得更强!

编辑 (opens new window)
#Future#CompletableFuture#CompletionService
上次更新: 2025/01/01
Java并发-JDK并发容器
Java并发-Fork Join框架

← Java并发-JDK并发容器 Java并发-Fork Join框架→

最近更新
01
Spring Boot版本新特性
09-15
02
Spring框架版本新特性
09-01
03
Spring Boot开发初体验
08-15
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式