Skip to content

CompletableFuture

更新: 3/4/2026 字数: 0 字

一、概念

CompletableFuture 是 Java 8 引入的异步编程工具,它实现了 Future 接口并扩展了丰富的异步回调能力,解决了传统 Future 只能阻塞获取结果、无法链式调用的痛点。

TIP

我们一直讲,并发编程可以分为三个层面的问题,分别是分工协作互斥

当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,你应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,

所以我把线程池、FutureCompletableFutureCompletionService 都列到了分工里面

INFO

对于简单的并行任务,你可以通过「线程池 + Future」的方案来解决;

如果任务之间有聚合关系,无论是 AND 聚合 还是 OR 聚合,都可以通过 CompletableFuture 来解决;

而批量的并行任务,则可以通过 CompletionService 来解决。

简单来说:

  • 传统 Future 像 “一张提货单”,只能阻塞等结果(get())
  • CompletableFuture 像 “智能提货单”,支持异步回调、链式处理、多任务组合,无需阻塞就能处理结果

核心特性

  • 异步执行任务:支持自定义线程池,避免使用默认的 ForkJoinPool 耗尽公共线程
  • 链式调用:通过 thenApply/thenAccept/thenRun 等方法实现结果的流水线处理
  • 异常处理:内置 exceptionally/handle 等方法优雅处理异步任务异常
  • 多任务组合:支持 thenCombine(两个任务合并)、allOf(所有任务完成)、anyOf(任一任务完成)等组合操作

方法后缀说明

  • Apply:使用 Function<T, R> 接口,入参为 (T),返回值为 R,用于转换数据类型
  • Accept:使用 Consumer<T> 接口,入参为 (T),返回值为 void,用于打印结果或写入数据库
  • Run:使用 Runnable 接口,入参为 (),返回值为 void,用于任务结束后发个通知
  • Combine:使用 BiFunction<T, U, R> 接口,入参为 (T, U),返回值为 R,用于合并两个并行任务的结果
  • Compose:使用 Function<T, CompletableFuture<R>> 接口,入参为 (T),返回值为 CompletableFuture<R>,用于链式调用(第一个完再调第二个)

二、方法

2.1 单任务链式处理方法(基于上一个任务结果)

  • thenApply:接收上一步结果,同步转换为新结果(Function<T, R>
  • thenApplyAsync:接收上一步结果,异步转换为新结果(Function<T, R>
  • thenAccept:接收上一步结果,同步消费(无返回)(Consumer<T>
  • thenAcceptAsync:接收上一步结果,异步消费(无返回)(Consumer<T>
  • thenRun:上一步完成后,同步执行无参操作(Runnable)
  • thenRunAsync:上一步完成后,异步执行无参操作(Runnable)
  • thenCompose:接收上一步结果,同步嵌套异步任务(解嵌套)(Function<T, CompletionStage<R>>
  • thenComposeAsync:接收上一步结果,异步嵌套异步任务(解嵌套)(Function<T, CompletionStage<R>>

2.2 双任务组合方法(两个任务都完成后执行)

  • thenCombine:两个任务都完成后,同步合并结果并返回新值(BiFunction<T, U, R>
  • thenCombineAsync:两个任务都完成后,异步合并结果并返回新值(BiFunction<T, U, R>
  • thenAcceptBoth:两个任务都完成后,同步消费两个结果(无返回)(BiConsumer<T, U>
  • thenAcceptBothAsync:两个任务都完成后,异步消费两个结果(无返回)(BiConsumer<T, U>
  • runAfterBoth:两个任务都完成后,同步执行无参操作(Runnable)
  • runAfterBothAsync:两个任务都完成后,异步执行无参操作(Runnable)

2.3 双任务竞争方法(任一任务完成后执行)

  • applyToEither:任一任务完成后,同步处理获胜任务的结果并返回新值(Function<T, R>
  • applyToEitherAsync:任一任务完成后,异步处理获胜任务的结果并返回新值(Function<T, R>
  • acceptEither:任一任务完成后,同步消费获胜任务的结果(无返回)(Consumer<T>
  • acceptEitherAsync:任一任务完成后,异步消费获胜任务的结果(无返回)(Consumer<T>
  • runAfterEither:任一任务完成后,同步执行无参操作(Runnable)
  • runAfterEitherAsync:任一任务完成后,异步执行无参操作(Runnable)

2.4 核心共性:同步 vs 异步版本

所有方法的 Async 后缀版本核心区别是执行线程:

  • Async(同步):默认和上一个任务在同一个线程执行(如果上一个任务已完成,则在调用该方法的线程执行)
  • Async(异步):在默认的 ForkJoinPool 或自定义线程池(方法重载可传入)中执行,不阻塞当前线程

2.5 方法选择总结

  • 单任务链式thenApply(转换结果)、thenAccept(消费结果)、thenRun(无参操作)、thenCompose(解嵌套异步)是核心,Async 版本用于耗时逻辑
  • 双任务都完成thenCombine(合并结果返回)、thenAcceptBoth(消费两个结果)、runAfterBoth(无参收尾),适合 "需要两个结果汇总" 的场景
  • 双任务任一完成applyToEither(转换获胜结果)、acceptEither(消费获胜结果)、runAfterEither(无参收尾),适合 "取最快结果" 的场景
  • Async 后缀:核心是切换执行线程,异步版本避免阻塞当前线程,适合耗时操作

三、可运行示例

示例 1:基础异步执行(替代传统 Future)

关键说明:

  • runAsync:适合无返回值的异步任务,参数是 Runnable
  • supplyAsync:适合有返回值的异步任务,参数是 Supplier<T>
  • 默认使用 ForkJoinPool.commonPool() 线程池,也可自定义线程池(示例 2 演示)
java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 异步执行无返回值任务(runAsync)
        CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000); // 模拟耗时操作
                System.out.println("无返回值异步任务执行完成(线程:" + Thread.currentThread().getName() + ")");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        voidFuture.get(); // 阻塞等待完成(仅演示,实际不推荐)

        // 2. 异步执行有返回值任务(supplyAsync)
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                return "有返回值的异步结果";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        String result = supplyFuture.get();
        System.out.println("获取异步结果:" + result);
    }
}

输出结果:

无返回值异步任务执行完成(线程:ForkJoinPool.commonPool-worker-1)
获取异步结果:有返回值的异步结果

示例 2:链式调用 + 自定义线程池

关键说明:

  • thenApply:接收上一步结果,返回新结果(Function<T, R>)
  • thenAccept:消费上一步结果,无返回(Consumer<T>)
  • thenRun:无参数、无返回,仅执行收尾操作(Runnable)
  • 传入自定义线程池,避免默认线程池被耗尽
java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureChainDemo {
    // 自定义线程池(推荐,避免占用公共线程池)
    private static final ExecutorService CUSTOM_POOL = Executors.newFixedThreadPool(3);

    public static void main(String[] args) throws Exception {
        // 链式处理:异步计算 → 转换结果 → 消费结果
        CompletableFuture<Void> future = CompletableFuture
                // 第一步:异步计算(自定义线程池)
                .supplyAsync(() -> {
                    System.out.println("第一步:计算数值(线程:" + Thread.currentThread().getName() + ")");
                    return 100;
                }, CUSTOM_POOL)
                // 第二步:转换结果(thenApply 接收上一步结果,返回新值)
                .thenApply(num -> {
                    System.out.println("第二步:数值翻倍(线程:" + Thread.currentThread().getName() + ")");
                    return num * 2;
                })
                // 第三步:消费结果(thenAccept 接收结果,无返回值)
                .thenAccept(finalNum -> {
                    System.out.println("第三步:最终结果 = " + finalNum);
                })
                // 第四步:完成后执行(thenRun 无参数,无返回值)
                .thenRun(() -> {
                    System.out.println("所有步骤执行完成");
                });

        future.get();
        CUSTOM_POOL.shutdown(); // 关闭线程池
    }
}

输出结果:

第一步:计算数值(线程:pool-1-thread-1)
第二步:数值翻倍(线程:pool-1-thread-2)
第三步:最终结果 = 200
所有步骤执行完成

示例 3:异常处理

关键说明:

  • exceptionally:仅在任务异常时执行,返回默认值
  • handle:无论正常 / 异常都会执行,同时接收结果和异常,更灵活
java
import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(() -> {
                    // 模拟异常
                    int a = 1 / 0;
                    return 100;
                })
                // 方式1:exceptionally 捕获异常,返回默认值
                .exceptionally(e -> {
                    System.out.println("捕获异常:" + e.getMessage());
                    return 0; // 异常时返回默认值
                });

        System.out.println("最终结果:" + future.get());

        // 方式2:handle 同时处理正常结果和异常
        CompletableFuture<String> handleFuture = CompletableFuture
                .supplyAsync(() -> {
                    return 200;
                })
                .handle((result, e) -> {
                    if (e != null) {
                        return "处理异常:" + e.getMessage();
                    } else {
                        return "正常结果:" + result;
                    }
                });
        System.out.println(handleFuture.get());
    }
}

输出结果:

捕获异常:/ by zero
最终结果:0
正常结果:200

示例 4:多任务组合

关键说明:

  • thenCombine:组合两个异步任务,接收两个任务的结果并处理
  • allOf:等待所有任务完成,适合 “并行执行多个任务,全部完成后汇总”
  • anyOf:等待第一个完成的任务,适合 “多源获取数据,取最快的一个”

代码

java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureCombineDemo {
    public static void main(String[] args) throws Exception {
        // 任务1:获取商品价格
        CompletableFuture<Integer> priceFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {}
            return 100;
        });

        // 任务2:获取商品折扣
        CompletableFuture<Double> discountFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {}
            return 0.8;
        });

        // 组合任务1和2:计算最终价格(thenCombine)
        CompletableFuture<Double> finalPriceFuture = priceFuture
                .thenCombine(discountFuture, (price, discount) -> price * discount);
        System.out.println("组合结果(最终价格):" + finalPriceFuture.get());

        // 多任务并行:allOf(所有任务完成)、anyOf(任一任务完成)
        CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {}
            return "任务A完成";
        });
        CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {}
            return "任务B完成";
        });

        // allOf:等待所有任务完成(无返回值,需手动获取每个结果)
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(taskA, taskB);
        allTasks.get();
        System.out.println("allOf - 任务A结果:" + taskA.get());
        System.out.println("allOf - 任务B结果:" + taskB.get());

        // anyOf:等待任一任务完成(返回第一个完成的结果)
        CompletableFuture<Object> anyTask = CompletableFuture.anyOf(taskA, taskB);
        System.out.println("anyOf - 第一个完成的任务:" + anyTask.get());
    }
}

输出结果:

组合结果(最终价格):80.0
allOf - 任务A结果:任务A完成
allOf - 任务B结果:任务B完成
anyOf - 第一个完成的任务:任务B完成

三、总结

  • 核心定位:CompletableFuture 是 Java 8 异步编程的核心工具,弥补了传统 Future 只能阻塞获取结果的缺陷
  • 核心能力:支持异步执行、链式回调、异常处理、多任务组合,无需手动管理线程
  • 最佳实践:优先使用自定义线程池(避免占用公共线程池),通过 exceptionally/handle 处理异常,用 allOf/anyOf 实现多任务并行

四、ExecutorCompletionService

ExecutorCompletionService 并不是一个独立的线程池实现,而是对 Executor(通常是 ThreadPoolExecutor)的包装增强,核心解决了「如何按任务完成顺序获取异步任务结果」的问题。

4.1 核心设计思路

  • 复用线程池:底层依赖你传入的 Executor 执行任务,自身不管理线程
  • 阻塞队列缓存结果:内部维护一个 BlockingQueue(默认是 LinkedBlockingQueue),每个任务执行完成后,其结果(Future)会被自动放入队列
  • 按完成顺序取结果:通过 take()/poll() 方法从队列中获取已完成的任务结果,谁先完成就先获取谁,而非按任务提交顺序

4.2 对比原生 ExecutorService 的痛点

原生 ExecutorService.submit() 返回的 Future,调用 get() 会阻塞直到该任务完成,如果遍历多个 Future 调用 get(),会「慢等快」(先提交的慢任务会阻塞后提交的快任务的结果获取)。

ExecutorCompletionService 则完美解决这个问题,始终能拿到当前已完成的任务结果。

4.3 用法示例

模拟多任务并行处理(比如批量下载文件、批量计算),按任务完成顺序输出结果,而非提交顺序。

java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceDemo {
    public static void main(String[] args) {
        // 1. 创建核心线程池(底层执行任务的线程池)
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 2. 包装线程池,创建 CompletionService
        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);

        // 3. 提交多个异步任务(任务执行时间不同)
        int taskCount = 5;
        for (int i = 0; i < taskCount; i++) {
            int taskId = i;
            // 提交任务:模拟不同执行耗时的任务(taskId 越大,耗时越长)
            completionService.submit(() -> {
                long sleepTime = (taskId + 1) * 1000; // 1s, 2s, 3s, 4s, 5s
                Thread.sleep(sleepTime);
                System.out.println("任务" + taskId + "执行完成,耗时" + sleepTime + "ms");
                return taskId; // 任务结果返回 taskId
            });
        }

        // 4. 按「完成顺序」获取任务结果
        List<Integer> completedResults = new ArrayList<>();
        try {
            for (int i = 0; i < taskCount; i++) {
                // take():阻塞等待队列中有已完成的任务,返回其 Future
                // 也可以用 poll(long timeout, TimeUnit unit) 设置超时
                Future<Integer> completedFuture = completionService.take();
                // 获取结果(此时任务已完成,get() 不会阻塞)
                Integer result = completedFuture.get();
                completedResults.add(result);
                System.out.println("获取到已完成任务的结果:" + result);
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 5. 关闭线程池(必须操作,否则程序不会退出)
            executor.shutdown();
        }

        // 输出最终结果列表(按完成顺序)
        System.out.println("所有任务完成,结果顺序:" + completedResults);
    }
}

运行结果(示例)

text
任务0执行完成,耗时1000ms
获取到已完成任务的结果:0
任务1执行完成,耗时2000ms
获取到已完成任务的结果:1
任务2执行完成,耗时3000ms
获取到已完成任务的结果:2
任务3执行完成,耗时4000ms
获取到已完成任务的结果:3
任务4执行完成,耗时5000ms
获取到已完成任务的结果:4
所有任务完成,结果顺序:[0, 1, 2, 3, 4]

4.4 关键方法解释

方法作用
submit(Callable<T> task)提交异步任务,任务完成后将 Future 放入内部队列
take()阻塞等待,直到队列中有已完成的任务 Future,返回该 Future
poll()非阻塞获取,若队列无已完成任务,直接返回 null
poll(long timeout, TimeUnit unit)带超时的获取,超时后返回 null

4.5 进阶场景:超时控制 + 异常处理

java
import java.util.concurrent.*;

public class CompletionServiceAdvanceDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);

        // 提交3个任务(包含一个会抛出异常的任务)
        completionService.submit(() -> {
            Thread.sleep(1000);
            return "任务A-成功";
        });
        completionService.submit(() -> {
            Thread.sleep(2000);
            throw new RuntimeException("任务B-执行失败"); // 模拟异常
        });
        completionService.submit(() -> {
            Thread.sleep(3000);
            return "任务C-成功";
        });

        // 按完成顺序获取,设置总超时时间
        int taskCount = 3;
        long timeout = 4000; // 总超时4秒
        long start = System.currentTimeMillis();
        try {
            for (int i = 0; i < taskCount; i++) {
                // 剩余超时时间 = 总超时 - 已用时间
                long remaining = timeout - (System.currentTimeMillis() - start);
                if (remaining <= 0) {
                    System.out.println("超时,剩余任务未处理");
                    break;
                }
                // 带超时获取已完成任务
                Future<String> future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
                if (future == null) {
                    System.out.println("获取任务结果超时");
                    break;
                }
                // 处理结果(捕获任务执行异常)
                try {
                    String result = future.get();
                    System.out.println("成功获取结果:" + result);
                } catch (ExecutionException e) {
                    System.out.println("任务执行异常:" + e.getCause().getMessage());
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // 恢复中断状态
            System.out.println("线程被中断");
        } finally {
            executor.shutdownNow(); // 强制关闭(未完成的任务中断)
        }
    }
}

运行结果(示例)

text
成功获取结果:任务A-成功
任务执行异常:任务B-执行失败
成功获取结果:任务C-成功

4.6 总结

  • 核心作用:ExecutorCompletionService 包装 Executor,通过内部阻塞队列实现「按任务完成顺序获取结果」,解决原生 Future 「慢等快」的问题
  • 核心方法:submit() 提交任务,take()(阻塞)/poll()(非阻塞)获取已完成任务的 Future
  • 最佳实践:使用时需注意关闭底层线程池,处理任务执行异常和超时,避免资源泄漏
  • 典型应用场景:批量异步任务处理(如批量接口调用、批量数据计算)、需要优先处理已完成任务的场景(如多节点监控结果汇总)