Skip to content

CountDownLatch

更新: 3/4/2026 字数: 0 字

在 JDK 8 中,CountDownLatch 是一个非常经典的同步辅助类,它的核心思想是:让一个或多个线程等待,直到其他线程完成一组操作。

它的实现原理可以概括为一句话:基于 AQS(AbstractQueuedSynchronizer)的共享模式实现的计数器

CountDownLatch 的实现原理

原理

  1. 核心机制:AQS 共享锁
  • CountDownLatch 内部定义了一个静态内部类 Sync,它继承了 AQS
  • State 的含义:在 CountDownLatch 中,AQS 的同步状态 state 被用作计数器。如果你初始化 new CountDownLatch(5),那么 state 的初始值就是 5
  • 共享模式:它使用的是 AQS 的共享模式(Shared Mode)。这意味着当条件满足(计数器归零)时,所有在队列中等待的线程都会被唤醒
  1. 主要方法的工作原理

① 构造函数

当你创建实例时,实际上是设置了 AQS 的 state:

java
public CountDownLatch(int count) {
    this.sync = new Sync(count); // 设置 AQS 的 state = count
}

② await() —— 等待归零

  • 当线程调用 await() 时,它会尝试获取共享锁
  • 判断条件:AQS 的 tryAcquireShared 方法会被触发。如果 state == 0,返回 1(成功);如果 state > 0,返回 -1(失败)
  • 阻塞逻辑:如果返回 -1,该线程就会被封装成 Node 节点放入 AQS 的等待队列中,并挂起(LockSupport.park),进入等待状态

③ countDown() —— 计数减一

  • 每当一个线程调用 countDown(),它会释放一次共享锁
  • 原子减一:通过 CAS(Compare And Swap)操作将 state 减 1
  • 唤醒动作:如果减完后 state 变为 0,说明“大门已开”,此时会调用 doReleaseShared() 方法,唤醒 AQS 队列中所有因 await() 而阻塞的线程
  1. 为什么它是“一次性”的? 这是 CountDownLatch 与 CyclicBarrier 最显著的区别:
  • 不可重置:CountDownLatch 没有提供修改或重置 state 的方法。一旦 state 变为 0,后续再调用 await() 的线程会直接通过,不会被阻塞
  • 状态不可逆:计数器只能减不能增

总结 CountDownLatch 的实现非常精简,它巧妙地利用了 AQS 队列同步器来管理线程的阻塞和唤醒。

  • 优点:性能极高,因为它是基于硬件级别的 CAS 操作和轻量级的 LockSupport 实现的
  • 缺点:计数器无法复用
  1. 核心要点总结
特性说明
一次性计数器不能重置(如果需要重置,请考虑使用 CyclicBarrier)
共享锁多个线程可以同时调用 await() 并在计数器归零时同时被唤醒
线程安全依赖 AQS 的 CAS 操作保证 state 修改的原子性

常用方案演示

模拟运动员起跑

在这个例子中,我们模拟 3 名运动员等待裁判鸣枪。裁判鸣枪后(计数器归零),所有运动员同时起跑

java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int runnerCount = 3;
        // 计数器初始化为 1
        CountDownLatch latch = new CountDownLatch(1);
        ExecutorService executor = Executors.newFixedThreadPool(runnerCount);

        for (int i = 1; i <= runnerCount; i++) {
            final int id = i;
            executor.submit(() -> {
                try {
                    System.out.println("运动员 " + id + " 已就位,等待鸣枪...");
                    // 运动员线程在此阻塞,等待 latch 变为 0
                    latch.await(); 
                    System.out.println("运动员 " + id + " 开始冲刺!");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 模拟裁判准备工作
        Thread.sleep(2000); 
        System.out.println("裁判:各就位,预备——跑!");
        
        // 计数器减 1,变为 0,唤醒所有等待的运动员
        latch.countDown(); 

        executor.shutdown();
    }
}

源码解析

过程拆解(源码视角)

CountDownLatch 的核心逻辑是基于 AQS (AbstractQueuedSynchronizer) 的共享锁模式实现的。

核心内部类:Sync

  • 它内部定义了一个继承自 AQS 的静态内部类 Sync
  • 状态变量 state:在 AQS 中,state 被用来表示计数器的当前值
  • 初始化:当你执行 new CountDownLatch(count) 时,实际上是设置了 AQS 的 state 为 count

关键方法源码分析

1. await() —— 等待倒计时结束

  • 当线程调用 await() 时,它会尝试获取共享锁
java
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
  • 在 AQS 内部,它会调用 tryAcquireShared。对于 CountDownLatch 来说:
    • 如果 state == 0,返回 1(获取锁成功,线程继续执行)
    • 如果 state != 0,返回 -1(获取锁失败,线程进入 AQS 队列阻塞等待)

2. countDown() —— 计数减一

  • 每当调用一次 countDown(),计数器减 1
java
public void countDown() {
    sync.releaseShared(1);
}
  • 在 AQS 内部,它会调用 tryReleaseShared。对于 CountDownLatch 来说:
    • 每次调用 countDown(),都会尝试释放一次共享锁
    • 如果释放成功(state 减 1 后为 0),则会调用 doReleaseShared() 唤醒所有等待的线程

源码

CountDownLatch

java
package java.util.concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.
     *
     * <p>If the current count is zero then this method returns immediately
     * with the value {@code true}.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of three things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>The specified waiting time elapses.
     * </ul>
     *
     * <p>If the count reaches zero then the method returns with the
     * value {@code true}.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.  If the time is less than or equal to zero, the method
     * will not wait at all.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the {@code timeout} argument
     * @return {@code true} if the count reached zero and {@code false}
     *         if the waiting time elapsed before the count reached zero
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }

    /**
     * Returns a string identifying this latch, as well as its state.
     * The state, in brackets, includes the String {@code "Count ="}
     * followed by the current count.
     *
     * @return a string identifying this latch, as well as its state
     */
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}