Skip to content

并发设计模式

更新: 3/4/2026 字数: 0 字

一、Immutability 模式

不变性(Immutability)模式

所谓不变性,简单来讲,就是对象一旦被创建之后,状态就不再发生变化。换句话说,就是变量一旦被赋值,就不允许修改了(没有写操作);没有修改操作,也就是保持了不变性。

1.1 快速实现具备不可变性的类

将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了。

更严格的做法是这个类本身也是 final 的,也就是不允许继承。

Java SDK 里很多类都具备不可变性,如经常用到的 StringLongIntegerDouble 等基础类型的包装类都具备不可变性。这些对象的线程安全性都是靠不可变性来保证的。如果你仔细翻看这些类的声明、属性和方法,你会发现它们都严格遵守不可变类的三点要求:类和属性都是 final 的,所有方法均是只读的。

1.2 享元模式(Flyweight Pattern)

利用享元模式可以减少创建对象的数量,从而减少内存占用。Java 语言里面 LongIntegerShortByte 等这些基本数据类型的包装类都用到了享元模式。

享元模式

享元模式本质上其实就是一个对象池,利用享元模式创建对象的逻辑也很简单:

  • 创建之前,首先去对象池里看看是不是存在
  • 如果已经存在,就利用对象池里的对象
  • 如果不存在,就会新创建一个对象,并且把这个新创建出来的对象放进对象池里

二、Copy-on-Write 模式

Copy-on-Write 模式(写时复制)

Copy-on-Write 模式,简称 COW 模式,是一种延时策略的并发设计模式。

使用 Copy-on-Write 更多地体现的是一种延时策略,只有在真正需要复制的时候才复制,而不是提前复制好。同时 Copy-on-Write 还支持按需复制,所以 Copy-on-Write 在操作系统领域是能够提升性能的。

相比较而言,Java 提供的 Copy-on-Write 容器,由于在修改的同时会复制整个容器,所以在提升读操作性能的同时,是以内存复制为代价的。这里你会发现,同样是应用 Copy-on-Write,不同的场景,对性能的影响是不同的。

Copy-on-Write 模式的应用场景

不过,Copy-on-Write 最大的应用领域还是在函数式编程领域。

函数式编程的基础是不可变性(Immutability),所以函数式编程里面所有的修改操作都需要 Copy-on-Write 来解决。

你或许会有疑问,"所有数据的修改都需要复制一份,性能是不是会成为瓶颈呢?"你的担忧是有道理的,之所以函数式编程早年间没有兴起,性能绝对拖了后腿。但是随着硬件性能的提升,性能问题已经慢慢变得可以接受了。

而且,Copy-on-Write 也远不像 Java 里的 CopyOnWriteArrayList 那样笨:整个数组都复制一遍。Copy-on-Write 也是可以按需复制的。

三、线程本地存储模式(ThreadLocal)

理解 ThreadLocal 的原理,核心在于打破"数据共享"的思维,转而实现"数据隔离"。简单来说,它不是为了解决多线程修改共享变量的问题,而是为每个线程提供一个专属的变量副本。

3.1 核心原理:谁在存数据?

很多人误以为数据存在 ThreadLocal 对象里,其实不然。

  • 数据载体:真正的存储容器是 ThreadLocalMap
  • 持有者:每个 Thread 线程对象内部都有一个名为 threadLocals 的成员变量,它的类型就是 ThreadLocalMap
  • Key 与 Value:在这个 Map 中,KeyThreadLocal 对象本身,Value 是你存进去的值

源码关键点(JDK 8+)

在 Thread.java 中:

java
public class Thread implements Runnable {
    // ...
    ThreadLocal.ThreadLocalMap threadLocals = null;
    // ...
}

在 ThreadLocal.java 的 set 方法中:

java
public void set(T value) {
    Thread t = Thread.currentThread(); // 1. 获取当前线程
    ThreadLocalMap map = getMap(t);    // 2. 拿到当前线程的 ThreadLocalMap
    if (map != null)
        map.set(this, value);          // 3. 以当前的 ThreadLocal 对象为 Key 存入
    else
        createMap(t, value);           // 4. 如果 Map 还没初始化则创建
}

3.2 代码示例:感受隔离性

下面的例子展示了即使两个线程操作同一个 ThreadLocal 变量,它们的数据也是互不干扰的。

java
public class ThreadLocalExample {
    private static final ThreadLocal<Integer> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        // 线程 1 设置值为 100
        Thread thread1 = new Thread(() -> {
            threadLocal.set(100);
            System.out.println("Thread 1: " + threadLocal.get());
        });

        // 线程 2 设置值为 200
        Thread thread2 = new Thread(() -> {
            threadLocal.set(200);
            System.out.println("Thread 2: " + threadLocal.get());
        });

        thread1.start();
        thread2.start();
    }
}

3.3 深入理解 ThreadLocalMap 与内存泄漏

ThreadLocalMapThreadLocal 的静态内部类,它没有实现 Map 接口,而是自己搞了一套哈希表逻辑。

弱引用(WeakReference)

ThreadLocalMap 的 Entry 继承了 WeakReference<ThreadLocal<?>>

java
static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value;
    Entry(ThreadLocal<?> k, Object v) {
        super(k); // Key 被包装成了弱引用
        value = v;
    }
}
  • 为什么用弱引用?

    • 如果 ThreadLocal 对象没有外部强引用了,那么在垃圾回收(GC)时,Map 里的 Key(即 ThreadLocal)就会被回收
  • 内存泄漏的风险:

    • 虽然 Key 被回收了,但 Value 是强引用。如果线程迟迟不结束(比如线程池中的长驻线程),这块 Value 就永远无法被回收,从而导致内存泄漏
    • 最佳实践:每次用完 ThreadLocal,务必调用 remove() 方法手动清理

InheritableThreadLocal 与继承性

InheritableThreadLocalThreadLocal 的子类,它的作用是在创建新线程时,将父线程的 ThreadLocal 变量值复制到子线程中。

代码示例:InheritableThreadLocal 继承性

java
public class InheritableThreadLocalExample {
    private static final InheritableThreadLocal<Integer> inheritableThreadLocal = new InheritableThreadLocal<>();

    public static void main(String[] args) {
        // 父线程设置值为 100
        inheritableThreadLocal.set(100);

        // 创建子线程
        Thread thread = new Thread(() -> {
            System.out.println("Child Thread: " + inheritableThreadLocal.get()); // 子线程也能获取到父线程的值
        });

        thread.start();
    }
}

不建议在线程池中使用 InheritableThreadLocal

不仅仅是因为它具有 ThreadLocal 相同的缺点——可能导致内存泄露,更重要的原因是:线程池中线程的创建是动态的,很容易导致继承关系错乱。如果你的业务逻辑依赖 InheritableThreadLocal,那么很可能导致业务逻辑计算错误,而这个错误往往比内存泄露更要命。

四、Guarded Suspension 模式(保护性地暂停)

Guarded Suspension 模式是一种用于解决多线程并发问题的设计模式,它的核心思想是:当一个线程需要等待某个条件满足时,它会进入"等待状态",而不是忙等(busy-waiting)。

4.1 Guarded Suspension 模式结构图

Guarded Suspension 模式结构图

GuardedObject 的内部实现非常简单,是管程的一个经典用法。你可以参考下面的示例代码,核心是:

  • get() 方法通过条件变量的 await() 方法实现等待
  • onChanged() 方法通过条件变量的 signalAll() 方法实现唤醒功能
java
// synchronized 关键字确保了 waitForCondition() 和 onChanged() 方法的原子性执行
public class GuardedObject {
    private final Object monitor = new Object();
    private boolean condition = false;

    public void waitForCondition() {
        synchronized (monitor) {
            while (!condition) {
                try {
                    monitor.wait(); // 线程进入等待状态
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void onChanged() {
        synchronized (monitor) {
            condition = true;
            monitor.notifyAll(); // 唤醒所有等待线程
        }
    }
}

// ReentrantLock 确保了 get() 和 onChanged() 方法的原子性执行
public class GuardedObject<T> {
    // 受保护的对象
    T obj;
    final Lock lock = new ReentrantLock();
    final Condition done = lock.newCondition();
    final int timeout = 1;

    // 获取受保护对象
    T get(Predicate<T> p) {
        lock.lock();
        try {
            // MESA 管程推荐写法
            while (!p.test(obj)) {
                done.await(timeout, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        // 返回非空的受保护对象
        return obj;
    }

    // 事件通知方法
    void onChanged(T obj) {
        lock.lock();
        try {
            this.obj = obj;
            done.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

五、Balking 模式(再谈线程安全的单例模式)

当状态满足某个条件时,执行某个业务逻辑,其本质其实不过就是一个 if 而已。放到多线程场景里,就是一种"多线程版本的 if"。

这种"多线程版本的 if"的应用场景还是很多的,所以也有人把它总结成了一种设计模式,叫做 Balking 模式。

5.1 Balking 模式的经典实现

java
boolean changed = false;

// 自动存盘操作
void autoSave() {
    synchronized (this) {
        if (!changed) {
            return;
        }
        changed = false;
    }
    // 执行存盘操作
    // 省略具体实现
    this.execSave();
}

// 编辑操作
void edit() {
    // 省略编辑逻辑
    // ...
    change();
}

// 改变状态
void change() {
    synchronized (this) {
        changed = true;
    }
}

和 Guarded Suspension 模式从实现上看似乎没有多大的关系,Balking 模式只需要用互斥锁就能解决,而 Guarded Suspension 模式则要用到管程这种高级的并发原语。

但是从应用的角度来看,它们解决的都是"线程安全的 if"语义,不同之处在于:

  • Guarded Suspension 模式会等待 if 条件为真
  • Balking 模式不会等待

Balking 模式的经典实现是使用互斥锁,你可以使用 Java 语言内置的 synchronized,也可以使用 SDK 提供的 Lock。如果你对互斥锁的性能不满意,可以尝试采用 volatile 方案,不过使用 volatile 方案需要你更加谨慎。

当然你也可以尝试使用双重检查方案来优化性能,双重检查中的第一次检查,完全是出于对性能的考量:避免执行加锁操作,因为加锁操作很耗时。而加锁之后的二次检查,则是出于对安全性负责。双重检查方案在优化加锁性能方面经常用到。

六、Thread-Per-Message 模式(最简单实用的分工方法)

并发编程领域的问题总结为三个核心问题:分工、同步和互斥

其中,同步和互斥相关问题更多地源自微观,而分工问题则是源自宏观。我们解决问题,往往都是从宏观入手。在编程领域,软件的设计过程也是先从概要设计开始,而后才进行详细设计。同样,解决并发编程问题,首要问题也是解决宏观的分工问题。

并发编程领域里,解决分工问题也有一系列的设计模式,比较常用的主要有 Thread-Per-Message 模式Worker Thread 模式生产者-消费者模式等等。

6.1 Thread-Per-Message 模式

简言之就是为每个任务分配一个独立的线程。这是一种最简单的分工方法,实现起来也非常简单。

一个最经典的应用场景是网络编程里服务端的实现,服务端为每个客户端请求创建一个独立的线程。当线程处理完请求后,自动销毁,这是一种最简单的并发处理网络请求的方法。

网络编程里最简单的程序当数 echo 程序了,echo 程序的服务端会原封不动地将客户端的请求发送回客户端。例如,客户端发送 TCP 请求"Hello World",那么服务端也会返回"Hello World"。

java
final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080));

// 处理请求
try {
    while (true) {
        // 接收请求
        SocketChannel sc = ssc.accept();
        // 每个请求都创建一个线程
        new Thread(() -> {
            try {
                // 读 Socket
                ByteBuffer rb = ByteBuffer.allocateDirect(1024);
                sc.read(rb);
                // 模拟处理请求
                Thread.sleep(2000);
                // 写 Socket
                ByteBuffer wb = (ByteBuffer) rb.flip();
                sc.write(wb);
                // 关闭 Socket
                sc.close();
            } catch (Exception e) {
                throw new UncheckedIOException(e);
            }
        }).start();
    }
} finally {
    ssc.close();
}

七、Worker Thread 模式(如何避免重复创建线程?)

Worker Thread 模式可以类比现实世界里车间的工作模式:车间里的工人,有活儿了,大家一起干,没活儿了就聊聊天等着。

Worker Thread 模式

下面的示例代码是用线程池实现的 echo 服务端。相比于 Thread-Per-Message 模式的实现,改动非常少,仅仅是创建了一个最多线程数为 500 的线程池 es,然后通过 es.execute() 方法将请求处理的任务提交给线程池处理。

java
ExecutorService es = Executors.newFixedThreadPool(500);
final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080));

// 处理请求
try {
    while (true) {
        // 接收请求
        SocketChannel sc = ssc.accept();
        // 将请求处理任务提交给线程池
        es.execute(() -> {
            try {
                // 读 Socket
                ByteBuffer rb = ByteBuffer.allocateDirect(1024);
                sc.read(rb);
                // 模拟处理请求
                Thread.sleep(2000);
                // 写 Socket
                ByteBuffer wb = (ByteBuffer) rb.flip();
                sc.write(wb);
                // 关闭 Socket
                sc.close();
            } catch (Exception e) {
                throw new UncheckedIOException(e);
            }
        });
    }
} finally {
    ssc.close();
    es.shutdown();
}

八、两阶段终止模式(如何优雅地终止线程?)

Java 语言的 Thread 类中曾经提供了一个 stop() 方法,用来终止线程,可是早已不建议使用了。原因是这个方法用的就是一剑封喉的做法,被终止的线程没有机会料理后事。

8.1 如何理解两阶段终止模式

第一个阶段主要是线程 T1 向线程 T2 发送终止指令,而第二阶段则是线程 T2 响应终止指令。

两阶段终止模式

Java 线程进入终止状态的前提是线程进入 RUNNABLE 状态,而实际上线程也可能处在休眠状态。也就是说,我们要想终止一个线程,首先要把线程的状态从休眠状态转换到 RUNNABLE 状态。

如何做到呢?

Java 中的线程状态转换图:

Java 线程状态转换图

这个要靠 Java Thread 类提供的 interrupt() 方法,它可以将休眠状态的线程转换到 RUNNABLE 状态。

线程转换到 RUNNABLE 状态之后,我们如何再将其终止呢?RUNNABLE 状态转换到终止状态,优雅的方式是让 Java 线程自己执行完 run() 方法。所以一般我们采用的方法是设置一个标志位,然后线程会在合适的时机检查这个标志位,如果发现符合终止条件,则自动退出 run() 方法。这个过程其实就是我们前面提到的第二阶段:响应终止指令。

综合上面这两点,我们能总结出终止指令,其实包括两方面内容:interrupt() 方法和线程终止的标志位。

JVM 的异常处理会清除线程的中断状态

java
try {
    Thread.sleep(2000);
} catch (InterruptedException e) {
    // 重新设置线程中断状态
    Thread.currentThread().interrupt();
}

强烈建议你设置自己的线程终止标志位,而不是依赖 interrupt() 方法。

下面的代码中,使用 isTerminated 作为线程终止标志位,此时无论是否正确处理了线程的中断异常,都不会影响线程优雅地终止。

java
class Proxy {
    // 线程终止标志位
    volatile boolean terminated = false;
    boolean started = false;
    // 采集线程
    Thread rptThread;

    // 启动采集功能
    synchronized void start() {
        // 不允许同时启动多个采集线程
        if (started) {
            return;
        }
        started = true;
        terminated = false;
        rptThread = new Thread(() -> {
            while (!terminated) {
                // 省略采集、回传实现
                report();
                // 每隔两秒钟采集、回传一次数据
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    // 重新设置线程中断状态
                    Thread.currentThread().interrupt();
                }
            }
            // 执行到此处说明线程马上终止
            started = false;
        });
        rptThread.start();
    }

    // 终止采集功能
    synchronized void stop() {
        // 设置中断标志位
        terminated = true;
        // 中断线程 rptThread
        rptThread.interrupt();
    }
}

8.2 如何优雅地终止线程池

线程池提供了两个方法:shutdown()shutdownNow()

shutdown() 方法是一种很保守的关闭线程池的方法。线程池执行 shutdown() 后,就会拒绝接收新的任务,但是会等待线程池中正在执行的任务和已经进入阻塞队列的任务都执行完之后才最终关闭线程池。

线程池执行 shutdownNow() 后,会拒绝接收新的任务,同时还会中断线程池中正在执行的任务。已经进入阻塞队列的任务也被剥夺了执行的机会,不过这些被剥夺执行机会的任务会作为 shutdownNow() 方法的返回值返回。

因为 shutdownNow() 方法会中断正在执行的线程,所以提交到线程池的任务,如果需要优雅地结束,就需要正确地处理线程中断。

如果提交到线程池的任务不允许取消,那就不能使用 shutdownNow() 方法终止线程池。不过,如果提交到线程池的任务允许后续以补偿的方式重新执行,也是可以使用 shutdownNow() 方法终止线程池的。

shutdown()shutdownNow() 它们实质上使用的也是两阶段终止模式,只是终止指令的范围不同而已:前者只影响阻塞队列接收任务,后者范围扩大到线程池中所有的任务。

九、生产者-消费者模式:用流水线思想提高效率

生产者 - 消费者模式示意图 alt text

生产者 - 消费者模式是一个不错的解耦方案

还有一个重要的优点就是支持异步,并且能够平衡生产者和消费者的速度差异

java
//任务队列
BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);
//启动5个消费者线程
//执行批量任务  
void start() {
  ExecutorService es = executors.newFixedThreadPool(5);
  for (int i=0; i<5; i++) {
    es.execute(()->{
      try {
        while (true) {
          //获取批量任务
          List<Task> ts=pollTasks();
          //执行批量任务
          execTasks(ts);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
  }
}
//从任务队列中获取批量任务
List<Task> pollTasks() throws InterruptedException{
  List<Task> ts=new LinkedList<>();
  //阻塞式获取一条任务
  Task t = bq.take();
  while (t != null) {
    ts.add(t);
    //非阻塞式获取一条任务
    t = bq.poll();
  }
  return ts;
}
//批量执行任务
execTasks(List<Task> ts) {
  //省略具体代码无数
}