JAVA多线程基础 之六 多线程间通信

    xiaoxiao2022-07-02  123

    wait与notify

    使用wait和notify方法实现线程间的通信(属于Object的方法)

    必须配合synchronized关键字使用。

    wait方法释放锁,notify方法不释放锁。

    public class WaitNotifyTest {// 在多线程间共享的对象上使用wait     private String[] shareObj = { "true" };     public static void main(String[] args) {         WaitNotifyTest test = new WaitNotifyTest();         ThreadWait threadWait1 = test.new ThreadWait("wait thread1");         threadWait1.setPriority(2);         ThreadWait threadWait2 = test.new ThreadWait("wait thread2");         threadWait2.setPriority(3);         ThreadWait threadWait3 = test.new ThreadWait("wait thread3");         threadWait3.setPriority(4);         ThreadNotify threadNotify = test.new ThreadNotify("notify thread");         threadNotify.start();         threadWait1.start();         threadWait2.start();         threadWait3.start();     }     class ThreadWait extends Thread {         public ThreadWait(String name){             super(name);         }         public void run() {             synchronized (shareObj) {                 while ("true".equals(shareObj[0])) {                     System.out.println("线程"+ this.getName() + "开始等待");                     long startTime = System.currentTimeMillis();                     try {                         shareObj.wait();                     } catch (InterruptedException e) {                         e.printStackTrace();                     }                     long endTime = System.currentTimeMillis();                     System.out.println("线程" + this.getName()                             + "等待时间为:" + (endTime - startTime));                 }             }             System.out.println("线程" + getName() + "等待结束");         }     }     class ThreadNotify extends Thread {         public ThreadNotify(String name){             super(name);         }         public void run() {             try {                 // 给等待线程等待时间                 sleep(3000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             synchronized (shareObj) {                 System.out.println("线程" + this.getName() + "开始准备通知");                 shareObj[0] = "false";                 System.out.println(shareObj[0] );                 shareObj.notifyAll();                 System.out.println("线程" + this.getName() + "通知结束");             }             System.out.println("线程" + this.getName() + "运行结束");         }     } }

    模拟实现队列

    public class LinkedBlockingQueueDemo {     //1。需要一个集合     List<Object> list = new ArrayList<>();     //2.计数器,记录容器里元素的个数     AtomicInteger count = new AtomicInteger();     //3.模拟有界队列需要上限和下限     final int miniSize = 0;     final int maxSize;     public LinkedBlockingQueueDemo(int maxSize) {         this.maxSize = maxSize;     }     //4.使用wait()¬ify()需要 synchronized ,所以需要一个对象进行加锁     final Object lock = new Object();     //5.put方法     //队列满时方法阻断,直到队列有元素被取走     public void put(Object obj) {         //5-1 加入sychronized代码块         synchronized (lock){             while(count.get() == maxSize){                 try {                     System.out.println("empty");                     lock.wait();                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }             System.out.println("put start --------");             //加入队列             list.add(obj);             System.out.println("put : " + obj);             //计数器累加             count.incrementAndGet();             System.out.println("put end --------");             //通知另外的线程             lock.notify();         }     }     //6.take方法     //队列空时方法阻断,直到队列有新的元素加入     public void take() {         synchronized (lock){             while(count.get() == miniSize){                 try {                     System.out.println("full");                     lock.wait();                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }             System.out.println("take start --------");         //移除元素         list.remove(0);             System.out.println("take : " + count);         //计数器减少         count.decrementAndGet();             System.out.println("take end --------");         //通知另外的线程         lock.notify();         }     }     public static void main(String[] args){         LinkedBlockingQueueDemo linkedBlockingQueue = new LinkedBlockingQueueDemo(6);         Thread t1 = new Thread(new Runnable() {             @Override             public void run() {                 for (int i = 0; i < 10; i++) {                     linkedBlockingQueue.put(i);                 }             }         });         Thread t2 = new Thread(new Runnable() {             @Override             public void run() {                 for (int i = 0; i < 10; i++) {                     linkedBlockingQueue.take();                 }             }         });         t1.start();         t2.start();     } }

    Condition

    Condition condition = lock.newCondition();

    res. condition.await();  //类似wait

    res. Condition. Signal(); //类似notify

     

    例子:

    用Lock和condition替换sychronized和wait/notify

    package com.test.threads; import java.util.Calendar; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /**  * Created by zhanghaipeng on 2018/11/6.  */ public class ConditionDemo {     public static void main(String[] args){         final Student5 student = new Student5();         Condition condition = student.reentrantLock.newCondition();         Thread inThread = new InStudent5(student,condition);         Thread outThread = new OutStudent5(student,condition);         outThread.start();         inThread.start();     } } class InStudent5 extends Thread{     private Student5 student;     Condition condition ;     public InStudent5(Student5 student,Condition condition){         this.student = student;         this.condition = condition;     }     public void run() {         while(true)         {             try { //                synchronized (student) {                 student.reentrantLock.lock();                     int count = new Random().nextInt();                     if (!student.flag) //                        student.wait();                     condition.await();                     if (count % 2 == 0) {                         student.setName("妞妞");                         student.setSex("");                     } else {                         student.setName("牛牛");                         student.setSex("");                     }                     System.out.println(Thread.currentThread().getId() + student.toString());                     sleep(1000);                     student.flag = false; //                    student.notify();                 condition.signal(); //                }             } catch (InterruptedException e) {             e.printStackTrace();             }finally {                 student.reentrantLock.unlock();             }         }     } } class OutStudent5 extends Thread{     private Student5 student;     Condition condition ;     public OutStudent5(Student5 student,Condition condition){         this.student = student;         this.condition = condition;     }     @Override     public void run() {         while (true) {             try { //                synchronized (student) {                 student.reentrantLock.lock();                     if (student.flag) { //                        student.wait();                         condition.await();                     }                     System.out.println(Thread.currentThread().getId() + student.toString());                     sleep(1000);                     student.flag = true; //                    student.notify();                 condition.signal(); //                }             } catch (InterruptedException e) {                 e.printStackTrace();             }finally {                 student.reentrantLock.unlock();             }         }     } } class Student5 {     private String name;     private String sex;     ReentrantLock reentrantLock = new ReentrantLock();     //    //false true 可写     public static volatile boolean flag = true;     public String getSex() {         return sex;     }     public void setSex(String sex) {         this.sex = sex;     }     public String getName() {         return name;     }     public void setName(String name) {         this.name = name;     }     @Override     public String toString() {         return " { name='" + name + '\'' +                 ", sex='" + sex + '\'' +                 '}';     } }

    Condition条件事ReentrantLock重入锁的好搭档,ReentrantLock锁的 new Condition() 方法可以生成一个与当前相关Condition实例。利用Condition对象,我们就可以让线程在合适的时候等待,或者在某一个特定的时刻让线程得到通知,继续执行

    以上方法的含义如下:

       await()方法会使当前线程等待,同时释放当前锁,当其他线程使用signal()或者signalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待,这和Object.wait()方法很相似;

       awaitUninterruptibly()方法与await()方法基本相同,但是它不会在等待过程中响应中断;

       awaitNanos(long nanosTimeout) nanosTimeout为等待的最大时间,单位为纳秒(ns),如果在时间内被唤醒,则返回nanosTimeout减去已等待的时间;如果在时间内没有被唤醒,则返回0或者负数;该方法在等待时间内也会相应中断;

       await(long time,TimeUnit unit) await()基本一致,但不同的是在时间内未被唤醒或者被中断都会返回false,其余情况返回true

       awaitUntil() awaitNanos(long nanosTimeout)一致,不同的是它不是等待时间段,而是时间到达参数指定的某一时刻;

       signal()方法用于唤醒一个在等待中的线程。相对的signalAll()方法会唤醒所有在等待中的线程,这和Object.notify()方法很类似;

     

    Condition具有比wait()/notify()更好的灵活性,具体体现在:

       一个锁实例,可以绑定多个Condition实例,实现多路通知;

       notify()方法进行通知时,是随机进行选择的,但重入锁结合Condition对象,可以实现有选择性的通知,这是非常重要的。

    最新回复(0)