CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
public class CyclicBarrierTest { public static Map<String,Integer> result = new ConcurrentHashMap<>(); public static CyclicBarrier barrier = new CyclicBarrier(4,()->{ System.out.println("汇总任务启动"); int rs = 0; for(Map.Entry<String,Integer> entry:result.entrySet()){ System.out.println(entry.getKey()+":"+entry.getValue()); rs += entry.getValue(); } System.out.println("rs="+rs); }); public static void main(String[] args) { for(int i=0;i<4;i++){ Thread t = new Thread(()->{ System.out.println(Thread.currentThread().getName()+"启动"); try { Random random = new Random(); result.put(Thread.currentThread().getName(),random.nextInt(10)); barrier.await(); System.out.println(Thread.currentThread().getName()+"唤醒后继续执行"); // 再次执行barrier.await() 还会触发汇总任务 // barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); t.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }打印结果
Thread-0启动 Thread-1启动 Thread-2启动 Thread-3启动 汇总任务启动 Thread-3:4 Thread-0:1 Thread-1:8 Thread-2:5 rs=18 Thread-3唤醒后继续执行 Thread-2唤醒后继续执行 Thread-1唤醒后继续执行 Thread-0唤醒后继续执行object.wait()和object.notifyAll()实现了通知,需配合synchronized使用。 condition.await()和condition.signalAll()和上面类似,需配合ReentrantLock使用。
public class ConditionTest { public static Lock lock = new ReentrantLock(); public static Condition cond1 = lock.newCondition(); public static Condition cond2 = lock.newCondition(); public static void main(String[] args) { for(int i=0;i<3;i++) { Thread t1 = new Thread(() -> { lock.lock(); try { // 类似于object.wait() 释放锁,等待被唤醒然后继续执行 if(Thread.currentThread().getId()==13){ System.out.println(Thread.currentThread().getName() + "获得锁,cond1.await()"); cond1.await(); }else{ System.out.println(Thread.currentThread().getName() + "获得锁,cond2.await()"); cond2.await(); } System.out.println(Thread.currentThread().getName() + "被唤醒,且带着锁休眠1s"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); t1.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // 唤醒所有线程,首先得获取锁,signal后得释放锁 lock.lock(); cond1.signalAll(); System.out.println("唤醒cond1所有线程,同时释放锁"); lock.unlock(); } }可以唤醒指定的与condition绑定的线程
Thread-0获得锁,cond2.await() Thread-1获得锁,cond2.await() Thread-2获得锁,cond1.await() 唤醒cond1所有线程,同时释放锁 Thread-2被唤醒,且带着锁休眠1s具体逻辑实现是
定义变量 lock , trip , generation , barrierCommand , parties,count 等等通过构造函数获取必须同时到达barrier的线程个数(parties)和parties个线程到达barrier时,会执行的动作(barrierCommand)如果未达到线程个数,则执行trip.await() 方法(即condition.await()),释放锁,等待被唤醒。count–如果达到线程个数,执行barrierCommand,trip.signalAll() 唤醒线程并更新generation,parties,count,意味着新的一轮计数开始。 定义变量 private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count; 构造函数 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); // parties表示“必须同时到达barrier的线程个数”。 this.parties = parties; // count表示“处在等待状态的线程个数”。 this.count = parties; // barrierCommand表示“parties个线程到达barrier时,会执行的动作”。 this.barrierCommand = barrierAction; } await() private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 获取“独占锁(lock)” lock.lock(); try { // 保存“当前的generation” final Generation g = generation; // 若“当前generation已损坏”,则抛出异常。 if (g.broken) throw new BrokenBarrierException(); // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 将“count计数器”-1 int index = --count; // 如果index=0,则意味着“有parties个线程到达barrier”。 if (index == 0) { // tripped boolean ranAction = false; try { // 如果barrierCommand不为null,则执行该动作。 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 唤醒所有等待线程,并更新generation。 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生, // 当前线程才继续执行。 for (;;) { try { // 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果等待过程中,线程被中断,则执行下面的函数。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 如果“当前generation已经损坏”,则抛出异常。 if (g.broken) throw new BrokenBarrierException(); // 如果“generation已经换代”,则返回index。 if (g != generation) return index; // 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 释放“独占锁(lock)” lock.unlock(); } } nextGeneration private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }