Skip to content

CyclicBarrier

更新: 3/4/2026 字数: 0 字

Java 并发编程中,CyclicBarrier(循环屏障)是一个非常实用的同步辅助类。它允许一组线程互相等待,直到所有线程都到达某个公共屏障点 (Common Barrier Point)。

它的核心特性是可重用性:当所有等待线程被释放后,屏障可以被重置并再次使用,这也是它被称为“循环”的原因

CyclicBarrier 的实现原理

原理

  1. 核心实现原理 (JDK 8 源码分析) CyclicBarrier 的底层并没有直接使用 AQS (AbstractQueuedSynchronizer) 的共享模式,而是通过 ReentrantLock 和 Condition 组合实现的。

核心内部成员

java
/** 屏障入口的锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 线程等待条件 */
private final Condition trip = lock.newCondition();
/** 屏障需要拦截的线程总数 */
private final int parties;
/** 当前代(Generation)执行完后的回调任务 */
private final Runnable barrierCommand;
/** 当前代 */
private Generation generation = new Generation();
/** 剩余还未到达屏障的线程数 */
private int count;

关键概念:Generation (代)

  • CyclicBarrier 使用一个内部类 Generation 来表示屏障的“当前周期”
  • 每当所有线程到达屏障,或者屏障被重置/损坏时,generation 就会更新
  • 这解决了并发中的“过早释放”问题,确保线程能识别自己是在哪一轮等待

核心方法:dowait 无论是 await() 还是带超时的 await(long, TimeUnit),最终都调用了私有的 dowait 方法。其逻辑如下:

  1. 获取锁:所有操作都在 lock.lock() 保护下进行

  2. 计数减少:count--

  3. 判断是否为最后一个到达者:

    • 如果是最后一个:执行 barrierCommand(如果有),然后调用 nextGeneration()。这个方法会唤醒所有在 trip 条件上等待的线程,并重置 count
    • 如果不是最后一个:进入 for(;;) 循环,调用 trip.await() 挂起线程,等待被唤醒或中断
  4. 核心源码片段解析

java
private int dowait(boolean timed, long nanos) throws ... {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        // 如果屏障已损坏,抛出异常
        if (g.broken) throw new BrokenBarrierException();

        int index = --count; // 剩余计数减一
        if (index == 0) {  // 最后一个线程到达
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null) command.run(); // 执行预定任务
                ranAction = true;
                nextGeneration(); // 唤醒所有人并开启下一代
                return 0;
            } finally {
                if (!ranAction) breakBarrier(); // 任务执行失败则损坏屏障
            }
        }

        // 非最后一个线程,进入自旋/等待
        for (;;) {
            try {
                if (!timed) trip.await();
                else if (nanos > 0L) nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待期间被中断,标记屏障损坏并唤醒其它人
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            // ... 检查异常状态 ...
            if (g != generation) return index; // 正常唤醒,返回索引
        }
    } finally {
        lock.unlock();
    }
}
  1. 与 CountDownLatch 的区别
特性CountDownLatchCyclicBarrier
可复用性一次性,计数归零后失效可循环使用 (reset())
实现机制基于 AQS 共享锁基于 ReentrantLock + Condition
等待角色主线程等待多个子线程多个子线程互相等待
额外功能支持 barrierAction (合并任务)

常用方案演示

模拟 3 个运动员参加田径赛。要求所有人都在起跑线准备好后,裁判才鸣枪出发

java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int runnerCount = 3;

        // 定义屏障:3个线程,外加一个所有人到达后的回调任务
        CyclicBarrier barrier = new CyclicBarrier(runnerCount, () -> {
            System.out.println("\n--- 所有人已就位,裁判鸣枪:跑! ---");
        });

        for (int i = 1; i <= runnerCount; i++) {
            final int id = i;
            new Thread(() -> {
                try {
                    System.out.println("运动员 " + id + " 正在穿鞋...");
                    Thread.sleep((long) (Math.random() * 2000));
                    
                    System.out.println("运动员 " + id + " 已到达起跑线,等待他人...");
                    // 线程在此处阻塞,直到第3个线程调用 await()
                    barrier.await();
                    
                    System.out.println("运动员 " + id + " 冲出了起跑线!");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

运行结果预期: 三个运动员随机完成准备。

前两个到达的运动员会显示“等待他人”。

第三个到达后,触发 barrierCommand 输出“鸣枪”。

随后三个运动员几乎同时输出“冲出了起跑线”。

源码

CyclicBarrier

java
/*
 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 */

/*
 *
 *
 *
 *
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * A synchronization aid that allows a set of threads to all wait for
 * each other to reach a common barrier point.  CyclicBarriers are
 * useful in programs involving a fixed sized party of threads that
 * must occasionally wait for each other. The barrier is called
 * <em>cyclic</em> because it can be re-used after the waiting threads
 * are released.
 *
 * <p>A {@code CyclicBarrier} supports an optional {@link Runnable}
 * command that is run once per barrier point, after the last thread
 * in the party arrives, but before any threads are released.  This
 * <em>barrier action</em> is useful for updating shared-state before
 * any of the parties continue.
 *
 * <p><b>Sample usage:</b> Here is an example of using a barrier in a
 * parallel decomposition design:
 *
 *  <pre> {@code
 * class Solver {
 *   final int N;
 *   final float[][] data;
 *   final CyclicBarrier barrier;
 *
 *   class Worker implements Runnable {
 *     int myRow;
 *     Worker(int row) { myRow = row; }
 *     public void run() {
 *       while (!done()) {
 *         processRow(myRow);
 *
 *         try {
 *           barrier.await();
 *         } catch (InterruptedException ex) {
 *           return;
 *         } catch (BrokenBarrierException ex) {
 *           return;
 *         }
 *       }
 *     }
 *   }
 *
 *   public Solver(float[][] matrix) {
 *     data = matrix;
 *     N = matrix.length;
 *     Runnable barrierAction =
 *       new Runnable() { public void run() { mergeRows(N); }};
 *     barrier = new CyclicBarrier(N, barrierAction);
 *
 *     List<Thread> threads = new ArrayList<Thread>(N);
 *     for (int i = 0; i < N; i++) {
 *       Thread thread = new Thread(new Worker(i));
 *       threads.add(thread);
 *       thread.start();
 *     }
 *
 *     // wait until done
 *     for (Thread thread : threads)
 *       try { thread.join(); } catch (InterruptedException ex) {}
 *   }
 * }}</pre>
 *
 * Here, each worker thread processes a row of the matrix then waits at the
 * barrier until all rows have been processed. When all threads have
 * reached the barrier, the barrier action executes, which merges the
 * rows. If the merger determines that a solution has been found then
 * {@code done()} will return {@code true} and each worker will
 * terminate.
 *
 * <p>If the barrier action does not rely on the parties being suspended when
 * it is executed, then any of the threads in the party could execute that
 * action when it is released. To facilitate this, each invocation of
 * {@link #await()} returns the arrival index of that thread at the barrier.
 * You can then choose which thread should execute the barrier action, for
 * example:
 *
 *  <pre> {@code
 * if (barrier.await() == 0) {
 *   // log the completion of this iteration
 * }}</pre>
 *
 * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
 * for failed synchronization attempts: If a thread leaves a barrier
 * point prematurely because of interruption, failure, or timeout, all
 * other threads waiting at that barrier point will also leave
 * abnormally via {@link BrokenBarrierException} (or
 * {@link InterruptedException} if they too were interrupted at about
 * the same time).
 *
 * <p>Memory consistency effects: Actions in a thread prior to calling
 * {@code await()} <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions that are part of the barrier action, which in turn
 * <i>happen-before</i> actions following a successful return from the
 * corresponding {@code await()} in other threads.
 *
 * @since 1.5
 * @see CountDownLatch
 * @author Doug Lea
 */
public class CyclicBarrier {
    /**
     * Each use of the barrier is represented as a generation instance.
     * The generation changes whenever the barrier is tripped, or
     * is reset. There can be many generations associated with threads
     * using the barrier - due to the reset() method.
     */
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException, 
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had been
                        // interrupted, so this interrupt is deemed to "belong"
                        // to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await()}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0)
            throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await()}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    /**
     * Returns the number of parties required to trip this barrier.
     *
     * @return the number of parties required to trip this barrier
     */
    public int getParties() {
        return parties;
    }

    /**
     * Waits until all {@linkplain #getParties parties} have invoked
     * {@code await} on this barrier.
     *
     * <p>If the current thread is not the last to arrive then it is
     * disabled for thread scheduling purposes and lies dormant until
     * one of the following things happens:
     * <ul>
     * <li>The last thread arrives;
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread;
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * one of the other waiting threads;
     * <li>Some other thread times out while waiting for the barrier;
     * <li>Some other thread invokes {@link #reset} on this barrier.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method;
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the barrier is {@link #reset} while any thread is waiting, then
     * {@link BrokenBarrierException} is thrown for those threads.
     *
     * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
     * then all other waiting threads will throw
     * {@link BrokenBarrierException} and the barrier is placed in the broken
     * state.
     *
     * <p>If the current thread is the last thread to arrive, and a
     * non-null barrier action was supplied in the constructor, then
     * the current thread runs the action before allowing the other
     * threads to continue.  If an exception occurs during the barrier
     * action then that exception will be propagated in the current
     * thread and the barrier is placed in the broken state.
     *
     * @return the arrival index of the current thread, where index
     *         {@code getParties() - 1} indicates the first to arrive
     *         and zero indicates the last to arrive
     * @throws InterruptedException if the current thread was interrupted
     *         while waiting
     * @throws BrokenBarrierException if <em>another</em> thread was
     *         interrupted or timed out while the current thread was
     *         waiting, or the barrier was reset, or the barrier was
     *         broken when {@code await} was called, or the barrier
     *         action (if present) failed due to an exception
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    /**
     * Waits until all {@linkplain #getParties parties} have invoked
     * {@code await} on this barrier, or the specified waiting time elapses.
     *
     * <p>If the current thread is not the last to arrive then it is
     * disabled for thread scheduling purposes and lies dormant until
     * one of the following things happens:
     * <ul>
     * <li>The last thread arrives;
     * <li>The specified timeout elapses;
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread;
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * one of the other waiting threads;
     * <li>Some other thread times out while waiting for the barrier;
     * <li>Some other thread invokes {@link #reset} on this barrier.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method;
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then {@link TimeoutException}
     * is thrown.  If the time is less than or equal to zero, the method
     * will not wait at all.
     *
     * <p>If the barrier is {@link #reset} while any thread is waiting, then
     * {@link BrokenBarrierException} is thrown for those threads.
     *
     * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
     * then all other waiting threads will throw
     * {@link BrokenBarrierException} and the barrier is placed in the broken
     * state.
     *
     * <p>If the current thread is the last thread to arrive, and a
     * non-null barrier action was supplied in the constructor, then
     * the current thread runs the action before allowing the other
     * threads to continue.  If an exception occurs during the barrier
     * action then that exception will be propagated in the current
     * thread and the barrier is placed in the broken state.
     *
     * @param timeout the time to wait for the barrier
     * @param unit the time unit of the timeout parameter
     * @return the arrival index of the current thread, where index
     *         {@code getParties() - 1} indicates the first to arrive
     *         and zero indicates the last to arrive
     * @throws InterruptedException if the current thread was interrupted
     *         while waiting
     * @throws TimeoutException if the specified timeout elapses
     * @throws BrokenBarrierException if <em>another</em> thread was
     *         interrupted or timed out while the current thread was
     *         waiting, or the barrier was reset, or the barrier was
     *         broken when {@code await} was called, or the barrier
     *         action (if present) failed due to an exception
     */
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    /**
     * Queries if this barrier is in a broken state.
     *
     * @return {@code true} if one or more parties broke out of this
     *         barrier due to interruption or timeout since
     *         construction or the last reset, or a barrier action
     *         failed due to an exception; {@code false} otherwise.
     */
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    /**
     * Resets the barrier to its initial state.
     * 
     * <p>If any parties are currently waiting at the barrier, they will
     * return with a {@link BrokenBarrierException}. Note that resetting
     * a barrier that is already in a broken state has no effect.
     */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

    /**
     * Returns the number of parties currently waiting at the barrier.
     * This method is primarily useful for debugging and assertions.
     *
     * @return the number of parties currently blocked in {@link #await()}
     */
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}