Skip to content

多线程

更新: 1/23/2026 字数: 0 字

线程数计算

text
# I/O 密集型
最佳线程数 = CPU 核数 * CPU 利用率 * [ 1 +(I/O 耗时 / CPU 耗时)]

学习总结

1. 并发问题的根源

产生并发的原因是 CPU、内存、磁盘速度的差异。在硬件和软件方面解决速度差异引发的并发问题:

  • CPU 缓存 → 可见性问题
  • 线程切换 → 原子性问题
  • 编译优化 → 重排序问题

2. 并发问题的解决方法

  • 可见性解决方法:volatilesynchronized
  • 原子性解决方法:互斥锁
  • 重排序解决方法:volatile(禁止编译优化)

3. 死锁问题

解决并发原子性产生的问题:死锁

死锁产生的条件

  • ① 资源互斥
  • ② 不能抢占
  • ③ 占有且等待
  • ④ 循环等待

死锁的解决办法

  • ① 按锁的顺序获取
  • ② 增加锁的分配器

4. 并发问题的宏观分析

以上都是从微观角度进行分析并发问题,宏观角度包括:

  • 安全问题
  • 性能问题
  • 活跃性问题

5. 并发的本质

从本质看问题:管程

6. 线程的实际应用

实际看问题:

  • Java 线程生命周期
  • 线程数的分配
  • 线程的执行

7. 面向对象解决并发问题

以子之矛攻子之盾,面向对象解决并发问题:

  • 属性使用 final 修饰
  • 属性设为私有
  • 只有 getter 方法没有 setter 方法
  • 属性赋值时进行深复制再操作

提示

以上代码为阻塞队列的实现,使用了 LockCondition 来解决并发问题。

java
public class BlockedQueue<T> {
    private final Object[] items;
    private int head = 0;
    private int tail = 0;
    private int count = 0;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BlockedQueue(int size) {
        this.items = new Object[size];
    }

    /**
     * 入队
     */
    public void enq(T x) throws InterruptedException {
        lock.lock();
        try {
            // 使用 while 防止虚假唤醒
            while (count == items.length) {
                System.out.println("队列已满,生产中等待。。。");
                notFull.await();
            }
            // 执行入队操作
            items[tail] = x;
            // 环形逻辑
            if (++tail == items.length) {
                tail = 0;
            }
            count++;
            // 通知消费者出队
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 出队
     */
    @SuppressWarnings("unchecked")
    public T deq() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                System.out.println("队列已空,消费者等待。。。");
                notEmpty.await();
            }
            // 执行出队操作
            T x = (T) items[head];
            // 助于 GC
            items[head] = null;
            // 环形逻辑
            if (++head == items.length) {
                head = 0;
            }
            count--;
            // 通知生产者可以继续入队
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        BlockedQueue<Integer> queue = new BlockedQueue<>(2);

        // 定义参与同步的线程总数:1个生产者 + 1个消费者 = 2
        int threadCount = 2;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("--- 所有线程准备就绪,同时开始执行! ---");
        });

        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                System.out.println("消费者已到达起跑线...");
                // 等待信号
                barrier.await();
                for (int i = 0; i < 50; i++) {
                    Integer x = queue.deq();
                    System.out.println(Thread.currentThread().getName() + "-" + System.currentTimeMillis() + "-消费了: " + x);
                    // 模拟消费慢
                    Thread.sleep(1000);
                }
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });

        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                System.out.println("生产者已到达起跑线...");
                // 等待信号
                barrier.await();
                for (int i = 0; i < 50; i++) {
                    System.out.println(Thread.currentThread().getName() + "-" + System.currentTimeMillis() + "-生产了: " + i);
                    queue.enq(i);
                }
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });

        // 启动线程
        producer.start();
        consumer.start();

    }

}