深浅模式
CompletableFuture
更新: 3/4/2026 字数: 0 字
一、概念
CompletableFuture 是 Java 8 引入的异步编程工具,它实现了 Future 接口并扩展了丰富的异步回调能力,解决了传统 Future 只能阻塞获取结果、无法链式调用的痛点。
TIP
我们一直讲,并发编程可以分为三个层面的问题,分别是分工、协作和互斥,
当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,你应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,
所以我把线程池、Future、CompletableFuture 和 CompletionService 都列到了分工里面
INFO
对于简单的并行任务,你可以通过「线程池 + Future」的方案来解决;
如果任务之间有聚合关系,无论是 AND 聚合 还是 OR 聚合,都可以通过 CompletableFuture 来解决;
而批量的并行任务,则可以通过 CompletionService 来解决。
简单来说:
- 传统 Future 像 “一张提货单”,只能阻塞等结果(get())
- CompletableFuture 像 “智能提货单”,支持异步回调、链式处理、多任务组合,无需阻塞就能处理结果
核心特性
- 异步执行任务:支持自定义线程池,避免使用默认的 ForkJoinPool 耗尽公共线程
- 链式调用:通过 thenApply/thenAccept/thenRun 等方法实现结果的流水线处理
- 异常处理:内置 exceptionally/handle 等方法优雅处理异步任务异常
- 多任务组合:支持 thenCombine(两个任务合并)、allOf(所有任务完成)、anyOf(任一任务完成)等组合操作
方法后缀说明
- Apply:使用
Function<T, R>接口,入参为(T),返回值为R,用于转换数据类型 - Accept:使用
Consumer<T>接口,入参为(T),返回值为void,用于打印结果或写入数据库 - Run:使用
Runnable接口,入参为(),返回值为void,用于任务结束后发个通知 - Combine:使用
BiFunction<T, U, R>接口,入参为(T, U),返回值为R,用于合并两个并行任务的结果 - Compose:使用
Function<T, CompletableFuture<R>>接口,入参为(T),返回值为CompletableFuture<R>,用于链式调用(第一个完再调第二个)
二、方法
2.1 单任务链式处理方法(基于上一个任务结果)
thenApply:接收上一步结果,同步转换为新结果(Function<T, R>)thenApplyAsync:接收上一步结果,异步转换为新结果(Function<T, R>)thenAccept:接收上一步结果,同步消费(无返回)(Consumer<T>)thenAcceptAsync:接收上一步结果,异步消费(无返回)(Consumer<T>)thenRun:上一步完成后,同步执行无参操作(Runnable)thenRunAsync:上一步完成后,异步执行无参操作(Runnable)thenCompose:接收上一步结果,同步嵌套异步任务(解嵌套)(Function<T, CompletionStage<R>>)thenComposeAsync:接收上一步结果,异步嵌套异步任务(解嵌套)(Function<T, CompletionStage<R>>)
2.2 双任务组合方法(两个任务都完成后执行)
thenCombine:两个任务都完成后,同步合并结果并返回新值(BiFunction<T, U, R>)thenCombineAsync:两个任务都完成后,异步合并结果并返回新值(BiFunction<T, U, R>)thenAcceptBoth:两个任务都完成后,同步消费两个结果(无返回)(BiConsumer<T, U>)thenAcceptBothAsync:两个任务都完成后,异步消费两个结果(无返回)(BiConsumer<T, U>)runAfterBoth:两个任务都完成后,同步执行无参操作(Runnable)runAfterBothAsync:两个任务都完成后,异步执行无参操作(Runnable)
2.3 双任务竞争方法(任一任务完成后执行)
applyToEither:任一任务完成后,同步处理获胜任务的结果并返回新值(Function<T, R>)applyToEitherAsync:任一任务完成后,异步处理获胜任务的结果并返回新值(Function<T, R>)acceptEither:任一任务完成后,同步消费获胜任务的结果(无返回)(Consumer<T>)acceptEitherAsync:任一任务完成后,异步消费获胜任务的结果(无返回)(Consumer<T>)runAfterEither:任一任务完成后,同步执行无参操作(Runnable)runAfterEitherAsync:任一任务完成后,异步执行无参操作(Runnable)
2.4 核心共性:同步 vs 异步版本
所有方法的 Async 后缀版本核心区别是执行线程:
- 无
Async(同步):默认和上一个任务在同一个线程执行(如果上一个任务已完成,则在调用该方法的线程执行) - 有
Async(异步):在默认的 ForkJoinPool 或自定义线程池(方法重载可传入)中执行,不阻塞当前线程
2.5 方法选择总结
- 单任务链式:
thenApply(转换结果)、thenAccept(消费结果)、thenRun(无参操作)、thenCompose(解嵌套异步)是核心,Async 版本用于耗时逻辑 - 双任务都完成:
thenCombine(合并结果返回)、thenAcceptBoth(消费两个结果)、runAfterBoth(无参收尾),适合 "需要两个结果汇总" 的场景 - 双任务任一完成:
applyToEither(转换获胜结果)、acceptEither(消费获胜结果)、runAfterEither(无参收尾),适合 "取最快结果" 的场景 - Async 后缀:核心是切换执行线程,异步版本避免阻塞当前线程,适合耗时操作
三、可运行示例
示例 1:基础异步执行(替代传统 Future)
关键说明:
runAsync:适合无返回值的异步任务,参数是RunnablesupplyAsync:适合有返回值的异步任务,参数是Supplier<T>- 默认使用
ForkJoinPool.commonPool()线程池,也可自定义线程池(示例 2 演示)
java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 异步执行无返回值任务(runAsync)
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000); // 模拟耗时操作
System.out.println("无返回值异步任务执行完成(线程:" + Thread.currentThread().getName() + ")");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
voidFuture.get(); // 阻塞等待完成(仅演示,实际不推荐)
// 2. 异步执行有返回值任务(supplyAsync)
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return "有返回值的异步结果";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String result = supplyFuture.get();
System.out.println("获取异步结果:" + result);
}
}输出结果:
无返回值异步任务执行完成(线程:ForkJoinPool.commonPool-worker-1)
获取异步结果:有返回值的异步结果示例 2:链式调用 + 自定义线程池
关键说明:
thenApply:接收上一步结果,返回新结果(Function<T, R>)thenAccept:消费上一步结果,无返回(Consumer<T>)thenRun:无参数、无返回,仅执行收尾操作(Runnable)- 传入自定义线程池,避免默认线程池被耗尽
java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureChainDemo {
// 自定义线程池(推荐,避免占用公共线程池)
private static final ExecutorService CUSTOM_POOL = Executors.newFixedThreadPool(3);
public static void main(String[] args) throws Exception {
// 链式处理:异步计算 → 转换结果 → 消费结果
CompletableFuture<Void> future = CompletableFuture
// 第一步:异步计算(自定义线程池)
.supplyAsync(() -> {
System.out.println("第一步:计算数值(线程:" + Thread.currentThread().getName() + ")");
return 100;
}, CUSTOM_POOL)
// 第二步:转换结果(thenApply 接收上一步结果,返回新值)
.thenApply(num -> {
System.out.println("第二步:数值翻倍(线程:" + Thread.currentThread().getName() + ")");
return num * 2;
})
// 第三步:消费结果(thenAccept 接收结果,无返回值)
.thenAccept(finalNum -> {
System.out.println("第三步:最终结果 = " + finalNum);
})
// 第四步:完成后执行(thenRun 无参数,无返回值)
.thenRun(() -> {
System.out.println("所有步骤执行完成");
});
future.get();
CUSTOM_POOL.shutdown(); // 关闭线程池
}
}输出结果:
第一步:计算数值(线程:pool-1-thread-1)
第二步:数值翻倍(线程:pool-1-thread-2)
第三步:最终结果 = 200
所有步骤执行完成示例 3:异常处理
关键说明:
exceptionally:仅在任务异常时执行,返回默认值handle:无论正常 / 异常都会执行,同时接收结果和异常,更灵活
java
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExceptionDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> {
// 模拟异常
int a = 1 / 0;
return 100;
})
// 方式1:exceptionally 捕获异常,返回默认值
.exceptionally(e -> {
System.out.println("捕获异常:" + e.getMessage());
return 0; // 异常时返回默认值
});
System.out.println("最终结果:" + future.get());
// 方式2:handle 同时处理正常结果和异常
CompletableFuture<String> handleFuture = CompletableFuture
.supplyAsync(() -> {
return 200;
})
.handle((result, e) -> {
if (e != null) {
return "处理异常:" + e.getMessage();
} else {
return "正常结果:" + result;
}
});
System.out.println(handleFuture.get());
}
}输出结果:
捕获异常:/ by zero
最终结果:0
正常结果:200示例 4:多任务组合
关键说明:
thenCombine:组合两个异步任务,接收两个任务的结果并处理allOf:等待所有任务完成,适合 “并行执行多个任务,全部完成后汇总”anyOf:等待第一个完成的任务,适合 “多源获取数据,取最快的一个”
代码
java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CompletableFutureCombineDemo {
public static void main(String[] args) throws Exception {
// 任务1:获取商品价格
CompletableFuture<Integer> priceFuture = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {}
return 100;
});
// 任务2:获取商品折扣
CompletableFuture<Double> discountFuture = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {}
return 0.8;
});
// 组合任务1和2:计算最终价格(thenCombine)
CompletableFuture<Double> finalPriceFuture = priceFuture
.thenCombine(discountFuture, (price, discount) -> price * discount);
System.out.println("组合结果(最终价格):" + finalPriceFuture.get());
// 多任务并行:allOf(所有任务完成)、anyOf(任一任务完成)
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {}
return "任务A完成";
});
CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {}
return "任务B完成";
});
// allOf:等待所有任务完成(无返回值,需手动获取每个结果)
CompletableFuture<Void> allTasks = CompletableFuture.allOf(taskA, taskB);
allTasks.get();
System.out.println("allOf - 任务A结果:" + taskA.get());
System.out.println("allOf - 任务B结果:" + taskB.get());
// anyOf:等待任一任务完成(返回第一个完成的结果)
CompletableFuture<Object> anyTask = CompletableFuture.anyOf(taskA, taskB);
System.out.println("anyOf - 第一个完成的任务:" + anyTask.get());
}
}输出结果:
组合结果(最终价格):80.0
allOf - 任务A结果:任务A完成
allOf - 任务B结果:任务B完成
anyOf - 第一个完成的任务:任务B完成三、总结
- 核心定位:CompletableFuture 是 Java 8 异步编程的核心工具,弥补了传统 Future 只能阻塞获取结果的缺陷
- 核心能力:支持异步执行、链式回调、异常处理、多任务组合,无需手动管理线程
- 最佳实践:优先使用自定义线程池(避免占用公共线程池),通过 exceptionally/handle 处理异常,用 allOf/anyOf 实现多任务并行
四、ExecutorCompletionService
ExecutorCompletionService 并不是一个独立的线程池实现,而是对 Executor(通常是 ThreadPoolExecutor)的包装增强,核心解决了「如何按任务完成顺序获取异步任务结果」的问题。
4.1 核心设计思路
- 复用线程池:底层依赖你传入的 Executor 执行任务,自身不管理线程
- 阻塞队列缓存结果:内部维护一个 BlockingQueue(默认是 LinkedBlockingQueue),每个任务执行完成后,其结果(Future)会被自动放入队列
- 按完成顺序取结果:通过 take()/poll() 方法从队列中获取已完成的任务结果,谁先完成就先获取谁,而非按任务提交顺序
4.2 对比原生 ExecutorService 的痛点
原生 ExecutorService.submit() 返回的 Future,调用 get() 会阻塞直到该任务完成,如果遍历多个 Future 调用 get(),会「慢等快」(先提交的慢任务会阻塞后提交的快任务的结果获取)。
ExecutorCompletionService 则完美解决这个问题,始终能拿到当前已完成的任务结果。
4.3 用法示例
模拟多任务并行处理(比如批量下载文件、批量计算),按任务完成顺序输出结果,而非提交顺序。
java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceDemo {
public static void main(String[] args) {
// 1. 创建核心线程池(底层执行任务的线程池)
ExecutorService executor = Executors.newFixedThreadPool(3);
// 2. 包装线程池,创建 CompletionService
ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
// 3. 提交多个异步任务(任务执行时间不同)
int taskCount = 5;
for (int i = 0; i < taskCount; i++) {
int taskId = i;
// 提交任务:模拟不同执行耗时的任务(taskId 越大,耗时越长)
completionService.submit(() -> {
long sleepTime = (taskId + 1) * 1000; // 1s, 2s, 3s, 4s, 5s
Thread.sleep(sleepTime);
System.out.println("任务" + taskId + "执行完成,耗时" + sleepTime + "ms");
return taskId; // 任务结果返回 taskId
});
}
// 4. 按「完成顺序」获取任务结果
List<Integer> completedResults = new ArrayList<>();
try {
for (int i = 0; i < taskCount; i++) {
// take():阻塞等待队列中有已完成的任务,返回其 Future
// 也可以用 poll(long timeout, TimeUnit unit) 设置超时
Future<Integer> completedFuture = completionService.take();
// 获取结果(此时任务已完成,get() 不会阻塞)
Integer result = completedFuture.get();
completedResults.add(result);
System.out.println("获取到已完成任务的结果:" + result);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
// 5. 关闭线程池(必须操作,否则程序不会退出)
executor.shutdown();
}
// 输出最终结果列表(按完成顺序)
System.out.println("所有任务完成,结果顺序:" + completedResults);
}
}运行结果(示例)
text
任务0执行完成,耗时1000ms
获取到已完成任务的结果:0
任务1执行完成,耗时2000ms
获取到已完成任务的结果:1
任务2执行完成,耗时3000ms
获取到已完成任务的结果:2
任务3执行完成,耗时4000ms
获取到已完成任务的结果:3
任务4执行完成,耗时5000ms
获取到已完成任务的结果:4
所有任务完成,结果顺序:[0, 1, 2, 3, 4]4.4 关键方法解释
| 方法 | 作用 |
|---|---|
submit(Callable<T> task) | 提交异步任务,任务完成后将 Future 放入内部队列 |
take() | 阻塞等待,直到队列中有已完成的任务 Future,返回该 Future |
poll() | 非阻塞获取,若队列无已完成任务,直接返回 null |
poll(long timeout, TimeUnit unit) | 带超时的获取,超时后返回 null |
4.5 进阶场景:超时控制 + 异常处理
java
import java.util.concurrent.*;
public class CompletionServiceAdvanceDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// 提交3个任务(包含一个会抛出异常的任务)
completionService.submit(() -> {
Thread.sleep(1000);
return "任务A-成功";
});
completionService.submit(() -> {
Thread.sleep(2000);
throw new RuntimeException("任务B-执行失败"); // 模拟异常
});
completionService.submit(() -> {
Thread.sleep(3000);
return "任务C-成功";
});
// 按完成顺序获取,设置总超时时间
int taskCount = 3;
long timeout = 4000; // 总超时4秒
long start = System.currentTimeMillis();
try {
for (int i = 0; i < taskCount; i++) {
// 剩余超时时间 = 总超时 - 已用时间
long remaining = timeout - (System.currentTimeMillis() - start);
if (remaining <= 0) {
System.out.println("超时,剩余任务未处理");
break;
}
// 带超时获取已完成任务
Future<String> future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
if (future == null) {
System.out.println("获取任务结果超时");
break;
}
// 处理结果(捕获任务执行异常)
try {
String result = future.get();
System.out.println("成功获取结果:" + result);
} catch (ExecutionException e) {
System.out.println("任务执行异常:" + e.getCause().getMessage());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
System.out.println("线程被中断");
} finally {
executor.shutdownNow(); // 强制关闭(未完成的任务中断)
}
}
}运行结果(示例)
text
成功获取结果:任务A-成功
任务执行异常:任务B-执行失败
成功获取结果:任务C-成功4.6 总结
- 核心作用:ExecutorCompletionService 包装 Executor,通过内部阻塞队列实现「按任务完成顺序获取结果」,解决原生 Future 「慢等快」的问题
- 核心方法:submit() 提交任务,take()(阻塞)/poll()(非阻塞)获取已完成任务的 Future
- 最佳实践:使用时需注意关闭底层线程池,处理任务执行异常和超时,避免资源泄漏
- 典型应用场景:批量异步任务处理(如批量接口调用、批量数据计算)、需要优先处理已完成任务的场景(如多节点监控结果汇总)
