深浅模式
CountDownLatch
更新: 3/4/2026 字数: 0 字
在 JDK 8 中,CountDownLatch 是一个非常经典的同步辅助类,它的核心思想是:让一个或多个线程等待,直到其他线程完成一组操作。
它的实现原理可以概括为一句话:基于 AQS(AbstractQueuedSynchronizer)的共享模式实现的计数器
CountDownLatch 的实现原理
原理
- 核心机制:AQS 共享锁
- CountDownLatch 内部定义了一个静态内部类 Sync,它继承了 AQS
- State 的含义:在 CountDownLatch 中,AQS 的同步状态 state 被用作计数器。如果你初始化 new CountDownLatch(5),那么 state 的初始值就是 5
- 共享模式:它使用的是 AQS 的共享模式(Shared Mode)。这意味着当条件满足(计数器归零)时,所有在队列中等待的线程都会被唤醒
- 主要方法的工作原理
① 构造函数
当你创建实例时,实际上是设置了 AQS 的 state:
java
public CountDownLatch(int count) {
this.sync = new Sync(count); // 设置 AQS 的 state = count
}② await() —— 等待归零
- 当线程调用 await() 时,它会尝试获取共享锁
- 判断条件:AQS 的 tryAcquireShared 方法会被触发。如果 state == 0,返回 1(成功);如果 state > 0,返回 -1(失败)
- 阻塞逻辑:如果返回 -1,该线程就会被封装成 Node 节点放入 AQS 的等待队列中,并挂起(LockSupport.park),进入等待状态
③ countDown() —— 计数减一
- 每当一个线程调用 countDown(),它会释放一次共享锁
- 原子减一:通过 CAS(Compare And Swap)操作将 state 减 1
- 唤醒动作:如果减完后 state 变为 0,说明“大门已开”,此时会调用 doReleaseShared() 方法,唤醒 AQS 队列中所有因 await() 而阻塞的线程
- 为什么它是“一次性”的? 这是 CountDownLatch 与 CyclicBarrier 最显著的区别:
- 不可重置:CountDownLatch 没有提供修改或重置 state 的方法。一旦 state 变为 0,后续再调用 await() 的线程会直接通过,不会被阻塞
- 状态不可逆:计数器只能减不能增
总结 CountDownLatch 的实现非常精简,它巧妙地利用了 AQS 队列同步器来管理线程的阻塞和唤醒。
- 优点:性能极高,因为它是基于硬件级别的 CAS 操作和轻量级的 LockSupport 实现的
- 缺点:计数器无法复用
- 核心要点总结
| 特性 | 说明 |
|---|---|
| 一次性 | 计数器不能重置(如果需要重置,请考虑使用 CyclicBarrier) |
| 共享锁 | 多个线程可以同时调用 await() 并在计数器归零时同时被唤醒 |
| 线程安全 | 依赖 AQS 的 CAS 操作保证 state 修改的原子性 |
常用方案演示
模拟运动员起跑
在这个例子中,我们模拟 3 名运动员等待裁判鸣枪。裁判鸣枪后(计数器归零),所有运动员同时起跑
java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int runnerCount = 3;
// 计数器初始化为 1
CountDownLatch latch = new CountDownLatch(1);
ExecutorService executor = Executors.newFixedThreadPool(runnerCount);
for (int i = 1; i <= runnerCount; i++) {
final int id = i;
executor.submit(() -> {
try {
System.out.println("运动员 " + id + " 已就位,等待鸣枪...");
// 运动员线程在此阻塞,等待 latch 变为 0
latch.await();
System.out.println("运动员 " + id + " 开始冲刺!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 模拟裁判准备工作
Thread.sleep(2000);
System.out.println("裁判:各就位,预备——跑!");
// 计数器减 1,变为 0,唤醒所有等待的运动员
latch.countDown();
executor.shutdown();
}
}源码解析
过程拆解(源码视角)
CountDownLatch 的核心逻辑是基于 AQS (AbstractQueuedSynchronizer) 的共享锁模式实现的。
核心内部类:Sync
- 它内部定义了一个继承自 AQS 的静态内部类 Sync
- 状态变量 state:在 AQS 中,state 被用来表示计数器的当前值
- 初始化:当你执行 new CountDownLatch(count) 时,实际上是设置了 AQS 的 state 为 count
关键方法源码分析
1. await() —— 等待倒计时结束
- 当线程调用 await() 时,它会尝试获取共享锁
java
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}- 在 AQS 内部,它会调用 tryAcquireShared。对于 CountDownLatch 来说:
- 如果 state == 0,返回 1(获取锁成功,线程继续执行)
- 如果 state != 0,返回 -1(获取锁失败,线程进入 AQS 队列阻塞等待)
2. countDown() —— 计数减一
- 每当调用一次 countDown(),计数器减 1
java
public void countDown() {
sync.releaseShared(1);
}- 在 AQS 内部,它会调用 tryReleaseShared。对于 CountDownLatch 来说:
- 每次调用 countDown(),都会尝试释放一次共享锁
- 如果释放成功(state 减 1 后为 0),则会调用 doReleaseShared() 唤醒所有等待的线程
源码
CountDownLatch
java
package java.util.concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
*
* @since 1.5
* @author Doug Lea
*/
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>If the current count is zero then this method returns immediately.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of two things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted},
* or the specified waiting time elapses.
*
* <p>If the current count is zero then this method returns immediately
* with the value {@code true}.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of three things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If the count reaches zero then the method returns with the
* value {@code true}.
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the count reached zero and {@code false}
* if the waiting time elapsed before the count reached zero
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() {
sync.releaseShared(1);
}
/**
* Returns the current count.
*
* <p>This method is typically used for debugging and testing purposes.
*
* @return the current count
*/
public long getCount() {
return sync.getCount();
}
/**
* Returns a string identifying this latch, as well as its state.
* The state, in brackets, includes the String {@code "Count ="}
* followed by the current count.
*
* @return a string identifying this latch, as well as its state
*/
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}