Java并发编程八CountDownLatch、CyclicBarrier、Semaphore和Exchanger

    xiaoxiao2025-06-20  20

    并发工具类

    CountDownLatch方法说明案例说明 CyclicBarrier方法说明案例说明两者不同 Semaphore方法说明阻塞非阻塞 案例说明 Exchanger案例说明 Java并发编程一:并发基础必知 Java并发编程二:Java中线程 Java并发编程三:volatile使用 Java并发编程四:synchronized和lock Java并发编程五:Atomic原子类 Java并发编程六:并发队列 Java并发编程七:ReentrantReadWriteLock和StampedLock

    CountDownLatch

    在日常开发中会遇见这样的场景,在主线程开启多个子线程,并且主线程等待子线程运行完毕在进行汇总,当然我们可以使用join方法来实现,但是join方法不够灵活,不能满足不同场景的需要,Jdk为我们提供了一种实现上述场景的工具类:CountDownLatch是允许一个或多个线程等待其他线程完成操作的工具类。

    方法说明

    CountDownLatch(int count) :初始化,参数为count的计数器 countDown():计算器减1 await() throws InterruptedException :阻塞,等待计算器为0再执行 await(longtimeout, TimeUnit unit) throws InterruptedException :超时阻塞,在一定时间内如果计算器还不为0则执行。 getCount():当前的数量。

    案例说明

    下面的一道题在笔试遇见过:有4个线程分别获取C、D、E、F盘的大小,第5个线程统计总大小。给出了获取磁盘大小的类DiskMemory。

    public class CountDownLatchDemo { static class DiskMemory{ private int totalSize; // 获取磁盘大小 随机产生1000以内的数,+1是为了防止磁盘大小为0 public int getSize(){ return (new Random().nextInt(10)+1)*100; } public void setTotalSize(int size){ totalSize+=size; } public int getTotalSize(){ return totalSize; } } static class MyThread extends Thread{ private String name; private DiskMemory memory; public MyThread(DiskMemory memory,String name){ this.memory=memory; this.name=name; } @Override public void run() { int size = memory.getSize(); System.out.println(Thread.currentThread().getName()+"读取磁盘"+name+"的大小为"+size); memory.setTotalSize(size); // 计算器减1 latch.countDown(); } } // 定义CountDownLatch初始值为4 static CountDownLatch latch = new CountDownLatch(4); static DiskMemory memory=new DiskMemory(); public static void main(String[] args) throws InterruptedException { new Thread(new MyThread(memory,"c"),"线程1").start(); new Thread(new MyThread(memory,"d"),"线程2").start(); new Thread(new MyThread(memory,"e"),"线程3").start(); new Thread(new MyThread(memory,"f"),"线程4").start(); // 等待子线程执行 latch.await(); System.out.println("磁盘总大小为:"+memory.getTotalSize()); } }

    运行结果:

    线程3读取磁盘e的大小为:900 线程2读取磁盘d的大小为:200 线程1读取磁盘c的大小为:500 线程4读取磁盘f的大小为:300 磁盘总大小为:1900

    CountDownLatch的构造函数需要接受一个大于0的参数作为计数器,当调用countDown方法后计数器就会减1,await方法会阻塞当前线程,直到计算器等于0。注意一点就是CountDownLatch不能重修初始化值或者修改内部的计算器的值,一个线程调用countDown方法happen-before于另一个线程调用await方法。

    CyclicBarrier

    可循环使用的屏障。与CountDownLatch不同的时,CyclicBarrier的默认构造方法虽然也是需要传入参数作为计算器,但表示拦截的线程数量,每个线程调用await方法告诉CyclicBarrier到达了屏障点,然后阻塞,直到最后一个线程到达屏障点时所有被拦截的线程同时执行。

    方法说明

    CyclicBarrier(int parties):初始化 指定屏障数是多少。 CyclicBarrier(int parties,Runnable barrierAction):初始化 当所有的线程达到临界运行前执行的线程。 await() throws InterruptedException, BrokenBarrierException :阻塞线程,等待其他线程达到屏障点。 await(long timeout, TimeUnit unit):超时阻塞,阻塞到屏障点的线程在超时后执行。 int getNumberWaiting():返回阻塞的线程数量。 boolean isBroken():用于查询阻塞等待的线程是否被中断。 void reset();:将屏障数重新初始化。

    案例说明

    六个运动员依次进场,等待裁判员说开始,然后再一起出发。

    public class CyclicBarrierDemo { // 初始为6 static CyclicBarrier cb=new CyclicBarrier(6,new Thread(){ @Override public void run() { System.out.println("裁判员:所有的选手都入场----------------------"); } }); public static void main(String[] args) { for (int i = 0; i <6; i++) { new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"选手已经入场,等待比赛开始..."); System.out.println("当前阻塞的线程数量为:"+cb.getNumberWaiting()); cb.await(); System.out.println(Thread.currentThread().getName()+"选手出发了..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }

    运行结果:

    Thread-1选手已经入场,等待比赛开始... 当前阻塞的线程数量为:0 Thread-3选手已经入场,等待比赛开始... 当前阻塞的线程数量为:1 Thread-5选手已经入场,等待比赛开始... 当前阻塞的线程数量为:2 Thread-2选手已经入场,等待比赛开始... 当前阻塞的线程数量为:3 Thread-6选手已经入场,等待比赛开始... 当前阻塞的线程数量为:4 Thread-4选手已经入场,等待比赛开始... 当前阻塞的线程数量为:5 裁判员:所有的选手都入场---------------------- Thread-4选手出发了... Thread-1选手出发了... Thread-3选手出发了... Thread-5选手出发了... Thread-2选手出发了... Thread-6选手出发了...

    两者不同

    CyclicBarrier的计数器可以被reset重置或者再次使用await方法,CountDownLatch的计数器只能使用一次。CyclicBarrier的构造方法提供了一个线程, 可以在所有线程达到屏障点开始运行前优先运行。CountDownLatch没有提供。

    Semaphore

    控制并发线程访问的个数,通过acquire() 获取一个许可,如果没有就等待, release() 释放一个许可,唤醒阻塞的线程。

    方法说明

    阻塞

    void acquire():用来获取一个许可证,没有则阻塞,直到获取成功。 void acquire(int permits):用来获取permits个许可证,没有则阻塞,直到获取成功。 void release():释放一个许可,必须在获取许可之后才能释放。 void release(int permits) { }:释放 permits 个许可。

    非阻塞

    boolean tryAcquire():尝试获取一个许可,成功为true,失败为false。 int availablePermits():返回此信号量中当前可用的许可证数。 int getQueueLength():返回正在等待获取许可证的线程数。 boolean hasQueuedThreads():是否有线程正在等待获取许可证。

    案例说明

    十个线程链接数据库,而数据库最多可链接三个,只有操作完毕后,释放链接后其他线程才能操作。

    public class SemaphoreDemo { public static void main(String[] args) { // 初始化为 3 Semaphore semaphore = new Semaphore(3); for (int i = 0; i <10; i++) { new Thread(()->{ try { // 获得许可 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"获得了连接池...当前数据库的连接池有"+semaphore.availablePermits()+"个"); // 模拟操作数据库 Thread.sleep(100); System.out.println(Thread.currentThread().getName()+"操作完毕...当前阻塞的线程有"+semaphore.getQueueLength()+"个"); // 释放许可 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } }

    运行结果:

    Thread-0获得了连接池…当前数据库的连接池有2个 Thread-4获得了连接池…当前数据库的连接池有1个 Thread-2获得了连接池…当前数据库的连接池有0个 Thread-0操作完毕…当前阻塞的线程有7个 Thread-6获得了连接池…当前数据库的连接池有0个 Thread-4操作完毕…当前阻塞的线程有6个 Thread-8获得了连接池…当前数据库的连接池有0个 Thread-2操作完毕…当前阻塞的线程有5个 Thread-1获得了连接池…当前数据库的连接池有0个 Thread-6操作完毕…当前阻塞的线程有4个 Thread-5获得了连接池…当前数据库的连接池有0个 Thread-8操作完毕…当前阻塞的线程有3个 Thread-9获得了连接池…当前数据库的连接池有0个 Thread-1操作完毕…当前阻塞的线程有2个 Thread-3获得了连接池…当前数据库的连接池有0个 Thread-5操作完毕…当前阻塞的线程有1个 Thread-7获得了连接池…当前数据库的连接池有0个 Thread-9操作完毕…当前阻塞的线程有0个 Thread-3操作完毕…当前阻塞的线程有0个 Thread-7操作完毕…当前阻塞的线程有0个

    Exchanger

    一个线程间协作的工具类,用于线程间的数据交换。两个线程之间通过exchange()方法来交换数据,如果第一次线程调用了Exchanger,它会一直等待另外一个线程调用exchange方法来交换数据,如果不想一直等待调用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。

    案例说明

    模拟一个场景:男孩和女孩相遇在校园相遇了,然后男孩对女孩表白。

    public class ExchangerDemo { static Exchanger<String> ex= new Exchanger<String>(); public static void main(String[] args) { System.out.println("在校园某处,男孩和女孩相遇了....."); // 男孩 new Thread(()->{ try { // 交换数据 String msg = ex.exchange("我暗恋你很久了"); // 得到回应 System.out.println("女孩:"+msg); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 女孩 new Thread(()->{ try { // 女孩想了想 Thread.sleep(100); // 交换数据 String msg= ex.exchange("其实我也喜欢你"); // 得到回应 System.out.println("男孩:"+msg); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }

    输出结果:

    在校园某处,男孩和女孩相遇了… 男孩:我暗恋你很久了 女孩:其实我也喜欢你

    最新回复(0)