专业的JAVA编程教程与资源

网站首页 > java教程 正文

Java8已经发布7年了,不会还有人没用过CompletableFuture吧

temp10 2024-09-07 20:13:26 java教程 9 ℃ 0 评论

前言

CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

Future有哪些缺陷?

多线程的场景下,我们要监听每个线程异步执行的结果,如果用Future去实现,代码如下:

Java8已经发布7年了,不会还有人没用过CompletableFuture吧

/**
 * Future异步示例
 *
 * @author liudong
 * @date 2021/12/9 15:37
 */
public class FutureDemo {
    public static void main(String[] args) throws Exception {
        // 1. 创建线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 2. 提交任务
        List<Callable<String>> tasks = new ArrayList<>();
        ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);
        taskIds.forEach(taskId -> {
            Callable<String> task = () -> {
                Thread.sleep(1000);
                return "任务" + taskId + "执行完成!";
            };
            tasks.add(task);
        });
        List<Future<String>> futures = executorService.invokeAll(tasks);
        // 3. 获取任务执行结果
        for (Future<String> future : futures) {
            String result = future.get();
            System.out.println(result);
        }
        executorService.shutdown();
    }
}

执行结果

任务1执行完毕!
任务2执行完毕!
任务3执行完毕!

大家有没有思考过这样使用有没有什么问题?笔者认为有如下几个缺陷:

  • (1) 不能回调会阻塞
  • (2) 批量任务处理彼此依赖会阻塞
  • (3) 不能多个任务级联执行,将结果依次往下传递
  • (4) 得不到最先完成的任务

CompletableFuture能解决吗?

针对如上几个问题,看看CompletableFuture是怎么解决的。

使用CompletableFuture提供的supplyAsync和whenCompleteAsync两个方法优化以上代码,如下:

/**
 * CompletableFuture异步示例
 *
 * @author liudong
 * @date 2021/12/9 15:37
 */
public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // 1. 创建线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 2. 提交任务
        List<Callable<String>> tasks = new ArrayList<>();
        ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);
        // 3. 回调任务执行结果
        List<CompletableFuture<String>> completableFutures = new ArrayList<>();
        CompletableFuture[] cfs = taskIds.stream().map((taskId) -> {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
                return "任务" + taskId + "执行完成!";
            }, executorService);
            // 异步返回执行结果
            completableFuture.whenCompleteAsync((result, exception) -> {
                System.out.println(result);
            });
            // 将处理结果传递到子任务
            completableFuture.thenAccept((result) -> {
                System.out.println("上级任务处理结果:" + result);
            });
            return completableFuture;
        }).toArray(CompletableFuture[]::new);
        
        // 获取最先执行完的任务
        CompletableFuture<Object> firstEnd = CompletableFuture.anyOf(cfs);
        System.out.println("最先执行完的任务:" + firstEnd.get());
        executorService.shutdown();
    }
}

执行结果:

上级任务处理结果:任务1执行完成!
上级任务处理结果:任务3执行完成!
最先执行完的任务:任务1执行完成!
上级任务处理结果:任务2执行完成!
任务3执行完成!
任务2执行完成!
任务1执行完成!

以上可以看出,执行结果是异步打印,不会阻塞,也不会顺序依赖,能获取上级任务执行结果,并能够获取到最先执行完的任务。

扩展知识点:

  • (1) 创建异步操作:
    • runAsync:不支持返回值
    • supplyAsync:支持返回值
  • (2) 计算结果完成时的回调方法:
    • whenComplete:执行完当前任务的线程,继续执行 whenComplete 的任务。
    • whenCompleteAsync:执行完当前任务的线程,把whenCompleteAsync 的任务继续提交给线程池来执行。
    • exceptionally:当前任务出现异常时,执行exceptionally中的回调方法。
  • (3) 线程串行化:
    • thenApply:当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
    • thenAccept 消费处理结果,接收任务的处理结果,并消费处理,无返回结果。
    • thenRun:跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept 。
    • handle:执行任务完成时,handle可以对结果进行处理。handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
  • (4) 合并任务
    • thenCombine:用于合并任务,thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
    • thenCompose:thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表