JAVA多线程基础 之九 CountdownLatch&CyclicBarrier&Semaphore的使用

    xiaoxiao2022-07-02  129

    CountdownLatch

    CountDownLatch 类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

    notify和wait的作用,给线程之间提供了通信,但是也是存在一定的弊端,就是不实时的问题,也就是当t1线程的notify()向t2线程发出通知,由于notify不释放锁,导致结果t1线程执行完毕后,t2线程才会去执行。我们要的效果是:当t2线程接收到通知以后t2线程就开始执行,而不是等待t1执行完毕后才执行。

    使用CountDownLatch这个类可以使线程间通信更为及时。

    public class ListAdd {     private volatile static List list = new ArrayList<>();     public void add(){         list.add("demo");     }     public int size(){         return list.size();     } //    ==================== 使用countdownLatch =====================     //    使用CountdownLatch替换, //    countDownLatch.countDown()替换nitify() //    countDownLatch.await()替换wait()     public static void main(String[] args){         final  ListAdd listAdd = new ListAdd();     final CountDownLatch countDownLatch = new CountDownLatch(1);     Thread t1 = new Thread(new Runnable() {         @Override         public void run() { //                synchronized (lock) {             try {                 for (int i = 0; i < 10; i++) {                     listAdd.add();                     System.out.println("当前线程:" + Thread.currentThread().getName() + " i: " + i);                     Thread.sleep(500);                     if (list.size() == 5) {                         System.out.println("发出通知"); //                                lock.notify();                         countDownLatch.countDown();                     }                 }             } catch (InterruptedException e) {                 e.printStackTrace();             } //                }         }     },"t1");     Thread t2 = new Thread(new Runnable() {         @Override         public void run() { //                synchronized (lock) {             if (list.size() != 5) {                 try { //                                System.out.println(Thread.currentThread().getName()+ "释放锁等待"); //                                lock.wait();                     countDownLatch.await();                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }             //Thread2 STOP             System.out.println("收到通知: " + Thread.currentThread().getName() + " Thread stop .");             throw new RuntimeException(); //                }         }     },"t2");     t2.start();     t1.start(); } }

     

     修改之前的模拟队列

    public class LinkedBlockingQueueDemo2 {     List list = new ArrayList<>();     AtomicInteger count = new AtomicInteger(0);     int minSize = 0;     int maxsize ;     private CountDownLatch countDownLatch;     public LinkedBlockingQueueDemo2(int maxsize ,CountDownLatch countDownLatch) {         this.maxsize = maxsize;         this.countDownLatch = countDownLatch;     }     Object lock = new Object();     public void put(String str){ //        synchronized (lock){             if(list.size()==maxsize)             {                 System.out.println("list full");                 try { //                    lock.wait();                     countDownLatch.await();                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }else             {                 list.add(str);                 try {                     Thread.sleep(1000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 System.out.println("start put " + str + " start ");                 count.incrementAndGet();                 System.out.println("start put " + str + " end "); //                lock.notify();                 countDownLatch.countDown();             } //        }     }     public void take(String str){         synchronized (lock){             if(list.size()==minSize)             {                 System.out.println("list empty");                 try { //                    lock.wait();                     countDownLatch.await();                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }else             {                 list.remove(0);                 try {                     Thread.sleep(1000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 System.out.println("start take " + str + " start ");                 count.decrementAndGet();                 System.out.println("start take " + str + " end "); //                lock.notify();                 countDownLatch.countDown();             }         }     }     public static void main(String[] args){         CountDownLatch countDownLatch = new CountDownLatch(1);         LinkedBlockingQueueDemo2 demo = new LinkedBlockingQueueDemo2(10,countDownLatch);         Thread t1 = new Thread(new Runnable() {             @Override             public void run() {                 for (int i = 0; i < 10; i++) {                     demo.put("str"+i);                 }             }         },"t1");         Thread t2 = new Thread(new Runnable() {             @Override             public void run() {                 for (int i = 0; i < 10; i++) {                     demo.take("str"+i);                 }             }         },"t2");         t1.start();         t2.start();     } }

    (屏障)CyclicBarrier

    CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。

    CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。

    CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

    public class CyclicBarrierDemo {     public static void main(String[] args){         CyclicBarrier cyclicBarrier = new CyclicBarrier(3);         for(int i = 0 ;i < 3 ;i++)         {             Writer writer = new Writer(cyclicBarrier);             new Thread(writer).start();         }         System.out.println("=================");     } } class Writer extends Thread {     private CyclicBarrier cyclicBarrier;     public Writer(CyclicBarrier cyclicBarrier){         this.cyclicBarrier=cyclicBarrier;     }     @Override     public void run() {         System.out.println("线程" + Thread.currentThread().getName() + ",正在写入数据");         try {             Thread.sleep(3000);         } catch (Exception e) {             // TODO: handle exception         }         System.out.println("线程" + Thread.currentThread().getName() + ",写入数据成功.....");         try {             cyclicBarrier.await();         } catch (Exception e) {         }         System.out.println("所有线程执行完毕..........");     } }

     

    分批处理

    /**  * Created by zhanghaipeng on 2018/11/8.  * 假设有人排队,我们将其分成 5 个人一批,  */ public class CyclicBarrierSample {     public static void main(String[] args) throws InterruptedException {         CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {             @Override             public void run() {                 System.out.println("Action...GO again!");                 }                });            for (int i = 0; i < 5; i++) {             Thread t = new Thread(new CyclicWorker(barrier));             t.start();             }     }     static class CyclicWorker implements Runnable {            private CyclicBarrier barrier;            public CyclicWorker(CyclicBarrier barrier) {             this.barrier = barrier;             }            @Override            public void run() {             try {                 for (int i=0; i<3 ; i++){                     System.out.println("finish " + (i+1) + " job");                     Thread.sleep(3000);                     barrier.await();                     }                 } catch (BrokenBarrierException e) {                 e.printStackTrace();                 } catch (InterruptedException e) {                 e.printStackTrace();                 }             }     } }

    (计数信号量)Semaphore

    Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。它的用法如下:

    availablePermits函数用来获取当前可用的资源数量

    wc.acquire(); //申请资源

    wc.release();// 释放资源

    // 创建一个计数阈值为5的信号量对象  // 只能5个线程同时访问  Semaphore semp = new Semaphore(5); try {     // 申请许可      semp.acquire();     try {         // 业务逻辑      } catch (Exception e) {     } finally {         // 释放许可          semp.release();     } } catch (InterruptedException e) { }

     

    例子:图书馆看书

    public class SemophoreDemo {     public static void main(String[] args){         // 创建一个计数阈值为5的信号量对象         // 只能5个线程同时访问         Semaphore semp = new Semaphore(5);         for (int i = 1; i <= 10; i++) {             Library library = new Library("" + i + "个人", semp);             library.start();         }     } } class Library extends Thread {     private String name;     private Semaphore seat;     public Library(String name, Semaphore seat) {         this.name = name;         this.seat = seat;     }     @Override     public void run() {         // 剩下的资源         int availablePermits = seat.availablePermits();         if (availablePermits > 0) {             System.out.println(name + "有位子了.....");         } else {             System.out.println(name + "怎么没有位子了...");         }         try {             // 申请资源             seat.acquire();         } catch (InterruptedException e) {         }         System.out.println(name + "坐下看书" + ",剩下位子:" + seat.availablePermits());         try {             Thread.sleep(new Random().nextInt(1000));         } catch (Exception e) {             // TODO: handle exception         }         System.out.println(name + "看完书!");         // 释放资源         seat.release();     } }

     

    不规范semaphore(分组释放)

    /**  * Created by zhanghaipeng on 2018/11/8.  * 5个一组释放  */ public class SemaphoreAbnormalDemo {     public static void main(String[] args){         Semaphore semaphore = new Semaphore(0);         for (int i = 0; i < 10; i++) {             Thread t = new MerryGoRound2("" + i + "",semaphore);             t.start();         }         try {             while (semaphore.availablePermits()!=0)             {                 Thread.sleep(5000);             }         } catch (InterruptedException e) {             e.printStackTrace();         }         System.out.println("Action go");         semaphore.release(5);         try {             while (semaphore.availablePermits()!=0)             {                 Thread.sleep(5000);             }         } catch (InterruptedException e) {             e.printStackTrace();         }         System.out.println("Action go again");         semaphore.release(5);     } } class MerryGoRound2 extends Thread {     private String name;     private Semaphore seat;     public MerryGoRound2(String name, Semaphore seat) {         this.name = name;         this.seat = seat;     }     @Override     public void run(){         int available = seat.availablePermits();         try {             if(available<=0) {                 System.out.println(Thread.currentThread().getId() + " " + name + " no more seat");             }             seat.acquire();             System.out.println(Thread.currentThread().getId() + " " + name + " get permit");         } catch (InterruptedException e) {             e.printStackTrace();         }     } }
    最新回复(0)