去过机场的人应该都会看到放行李的传送带,工作人员将托运的行李不停的放入传送带,乘客下飞机之后会不停的拿走行李。 这个例子和生产者/消费者设计模式相似。
这种设计模式需要满足以下三点要求:
(1)生产者生产数据到缓冲区中,消费者从缓冲区中取数据。 (2)如果缓冲区已经满了,则生产者线程阻塞; (3)如果缓冲区为空,那么消费者线程阻塞。
编写之前分析:
(1)定义一个缓存队列,选择一个集合当做缓存,给予缓存上限,缓存队列只有两种行为(生产数据和消费数据); (2)定义一个生产者线程,调用缓存队列中的生产行为; (3)定义一个消费者线程,调用缓存队列中的消费行为;
开始编写代码:
定义一个缓存队列
/** * 公共缓存队列 * 只做两件事:(1)生产;(2)消费 */ public class PublicQueue<T> { private int putIndex = 0;//数据插入的角标 private int maxCount = 50;//缓存区最大长度 private LinkedHashMap<Integer, T> linkedHashMap = new LinkedHashMap<>();//缓冲区 public synchronized void add(T msg){ if(linkedHashMap.size() == maxCount){ //如果缓存区达到最大数量,则阻塞生产者线程 try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } }else{ notifyAll();//唤醒所有线程 } linkedHashMap.put(putIndex, msg); System.out.println("生产一个产品,当前商品角标为:"+putIndex+"===文本为:"+msg+"===缓存长度为:"+linkedHashMap.size()); putIndex = (putIndex + 1 >= maxCount) ? (putIndex + 1) % maxCount : putIndex + 1; } public synchronized T remove(){ if(linkedHashMap.size() == 0){ //如果缓存区没有数据,则阻塞消费线程 try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } }else{ notifyAll();//唤醒所有线程 } Iterator it = linkedHashMap.entrySet().iterator(); T t = null; if(it.hasNext()){ Map.Entry<Integer, T> entry = (Map.Entry<Integer, T>) it.next(); t = entry.getValue(); int index = entry.getKey(); linkedHashMap.remove(index); System.out.println("消费一个产品,当前商品角标为:"+index+"===文本为:"+ t +"===缓存长度为:"+linkedHashMap.size()); } return t; } }定义一个生产者线程
/** * 生产者线程 */ public class ProducerThread extends Thread { private PublicQueue publicQueue; public ProducerThread(PublicQueue publicQueue){ this.publicQueue = publicQueue; } @Override public void run() { for(int i=0;i<60;i++){ publicQueue.add(String.valueOf(i)); } } }定义一个消费者
/** * 消费者线程 */ public class ConsumerThread extends Thread { private PublicQueue publicQueue; public ConsumerThread(PublicQueue publicQueue){ this.publicQueue = publicQueue; } @Override public void run() { for(;;){ publicQueue.remove(); } } }启动:
public class ProducerConsumerTest { public static void main(String[] args){ PublicQueue publicQueue = new PublicQueue(); ProducerThread producerThread = new ProducerThread(publicQueue); ConsumerThread consumerThread = new ConsumerThread(publicQueue); producerThread.start();//启动生产者线程 consumerThread.start();//启动消费者线程 } }代码分析:
(1)生产者/消费者设计模式顾名思义就是两个互斥线程,一个负责生产,一个负责消费,两者是线程不安全的; (2)这里选择使用LinkedHashMap作为缓存队列,LinkedHashMap是一个双向链表,用来处理线程不安全的数据,可以保证取出第一个数据,it.next()就是取出第一个数据。(LinkedHashMap可以保证遍历的顺序) (3)为了保证互斥线程的安全性,需要做对应的处理,以上代码使用了synchronized 、wait()、notifyAll()来保证。
那么本例中保证线程安全还有什么方案呢? (1)lock和condition的await、signalAll
/** * 公共缓存队列 * 只做两件事:(1)生产;(2)消费 */ public class PublicQueue<T> { private int putIndex = 0;//数据插入的角标 private int maxCount = 50;//缓存区最大长度 private Lock lock; private Condition addCondition; private Condition removeCondition; public PublicQueue(){ lock = new ReentrantLock(); addCondition = lock.newCondition(); removeCondition =lock.newCondition(); } private LinkedHashMap<Integer, T> linkedHashMap = new LinkedHashMap<>();//缓冲区 public void add(T msg){ try { lock.lock(); if (linkedHashMap.size() == maxCount){ //如果缓存区达到最大数量,则阻塞生产者线程 addCondition.await();//等待 } linkedHashMap.put(putIndex, msg); System.out.println("生产一个产品,当前商品角标为:"+putIndex+"===文本为:"+msg+"===缓存长度为:"+linkedHashMap.size()); putIndex = (putIndex + 1 >= maxCount) ? (putIndex + 1) % maxCount : putIndex + 1; removeCondition.signalAll();//唤醒所有线程 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public T remove(){ T t = null; try { lock.lock(); if (linkedHashMap.size() == 0){ //如果缓存区没有数据,则阻塞消费线程 removeCondition.await();//等待 } Iterator it = linkedHashMap.entrySet().iterator(); if(it.hasNext()){ Map.Entry<Integer, T> entry = (Map.Entry<Integer, T>) it.next(); t = entry.getValue(); int index = entry.getKey(); linkedHashMap.remove(index); System.out.println("消费一个产品,当前商品角标为:"+index+"===文本为:"+ t +"===缓存长度为:"+linkedHashMap.size()); } addCondition.signalAll();//唤醒所有线程 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return t; } }(2)利用阻塞队列BlockingQueue (最简单的)
/** * 公共缓存队列 * 只做两件事:(1)生产;(2)消费 */ public class PublicQueue<T> { private BlockingDeque<T> blockingDeque = new LinkedBlockingDeque<>(50);//缓冲区 public void add(T msg){ try { blockingDeque.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生产一个产品,当前商品角标为:"+"===文本为:"+msg); } public T remove(){ T t = null; try { t = blockingDeque.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费一个产品,当前商品角标为:"+"===文本为:"+t); return t; } }总结:
有关缓存队列的处理有三种方法: (1)双向链表LinkedHashMap和synchronized结合; (2)双向链表LinkedHashMap和lock结合; (3)直接使用阻塞队列BlockingQueue。
作者:NoBugException 链接:https://www.jianshu.com/p/02ce3f75b3fe 来源:简书 简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。