CyclicBarrier详解

    xiaoxiao2024-11-04  81

    CyclicBarrier介绍

    CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier),类似于CountDownLatch也是个计数器,不同的是CyclicBarrier要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以重新使用的(reset()方法重置屏障点)。

    CyclicBarrier,让一组线程到达一个同步点后再一起继续运行,在其中任意一个线程未达到同步点,其他到达的线程均会被阻塞。

    CyclicBarrier是一种同步机制允许一组线程相互等待,等到所有线程都到达一个屏障点才退出await方法,它没有直接实现AQS而是借助ReentrantLock来实现的同步机制。它是可循环使用的,而CountDownLatch是一次性的,另外它体现的语义也跟CountDownLatch不同,CountDownLatch减少计数到达条件采用的是release方式,而CyclicBarrier走向屏障点(await)采用的是Acquire方式,Acquire是会阻塞的,这也实现了CyclicBarrier的另外一个特点,只要有一个线程中断那么屏障点就被打破,所有线程都将被唤醒(CyclicBarrier自己负责这部分实现,不是由AQS调度的),这样也避免了因为一个线程中断引起永远不能到达屏障点而导致其他线程一直等待。屏障点被打破的CyclicBarrier将不可再使用(会抛出BrokenBarrierException)除非执行reset操作。

    CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行

    CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行

    CyclicBarrier示例

    package main.java.study; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { public static class Soldier implements Runnable { private String soldier; private final CyclicBarrier cyclic; public Soldier(CyclicBarrier cyclic ,String soldier) { this.cyclic=cyclic; this.soldier = soldier; } @Override public void run() { try { cyclic.await(); doWork(); cyclic.await(); } catch (InterruptedException | BrokenBarrierException e) { // TODO: handle exception e.printStackTrace(); } } void doWork() { try { Thread.sleep(new Random().nextInt(10) * 1000); System.out.println(soldier + " done!"); } catch (InterruptedException e) { // TODO: handle exception e.printStackTrace(); } } } public static class BarrierRun implements Runnable { boolean flag = false; int N; public BarrierRun(boolean flag,int n) { this.flag=flag; this.N =n; } @Override public void run() { if (flag) { System.out.println("soldier" + N + " done!"); } else { System.out.println("soldier" + N + " collected!"); flag = true; } } } public static void main (String[] args) { final int N = 10; Thread[] all= new Thread[N]; boolean flag=false; CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N)); System.out.println("begin gather:"); for(int i = 0;i< N ;i++) { System.out.println("soldier:" + i + " coming."); all[i] = new Thread( new Soldier(cyclic, "solder" + i)); all[i].start(); } } }

    执行结果:

    begin gather: soldier:0 coming. soldier:1 coming. soldier:2 coming. soldier:3 coming. soldier:4 coming. soldier:5 coming. soldier:6 coming. soldier:7 coming. soldier:8 coming. soldier:9 coming. soldier10 collected! solder3 done! solder9 done! solder0 done! solder7 done! solder5 done! solder4 done! solder2 done! solder1 done! solder8 done! solder6 done! soldier10 done!

    Soldier执行了2次 await()方法,第1次被唤醒,执行doWork(),然后再次await(),

    CyclicBarrier源码

    CyclicBarrier方法

    //默认构造方法,参数表示拦截的线程数量 public CyclicBarrier(int parties) { this(parties, null); } //用于在线程到达同步点时,优先执行线程barrierAction public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //可中断等待 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //可超时等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }

     

    //打破屏障 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } //屏障是否被打破 public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } //重置 public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } //获取正在barrir处等待数,即已经到达数 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }

    CyclicBarrier的数据结构

    //用于标记每次屏障 private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** 等待直到全部到达 */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); //仍然在被等待数,即未到达barrir处数 private int count;

    CyclicBarrier等待与唤醒

    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //如果屏障已被打破,则退出 if (g.broken) throw new BrokenBarrierException(); //如果发生中断,则打破屏障 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //未到达数减1 int index = --count; if (index == 0) { // tripped //全部都已到达,则继续 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); //如果有优先执行的命令,则运行 ranAction = true; //Action已执行 nextGeneration(); //准备下一次设置屏障 return 0; } finally { if (!ranAction) //如果Action执行失败,则打破屏障 breakBarrier(); } } // 在屏障处等待............ for (;;) { try { if (!timed) //未设置超时 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); //设置超时 } catch (InterruptedException ie) { if (g == generation && ! g.broken) { //仍是本次barrir,并且屏障未打破,则打破屏障。 breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) //不是本次barrir,则返回 return index; if (timed && nanos <= 0L) { //超时则打破屏障 breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }

        private void nextGeneration() {         // signal completion of last generation         trip.signalAll();         // set up next generation         count = parties;         generation = new Generation();     }

        /**      * Sets current barrier generation as broken and wakes up everyone.      * Called only while holding lock.      */     private void breakBarrier() {         generation.broken = true;         count = parties;         trip.signalAll();     }

    ReentrantLock参考: ConditionObject源码

    最新回复(0)