声明:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 译者:郑玉婷
实现一个基于优先级的传输队列
Java 7 API 提供几种与并发应用相关的数据类型。从这里面,我们想来重点介绍以下2种数据类型:
LinkedTransferQueue:这个数据类型支持那些有生产者和消费者结构的程序。 在那些应用,你有一个或者多个数据生产者,一个或多个数据消费者和一个被生产者和消费者共享的数据类型。生产者把数据放入数据结构内,然后消费者从数据结构内提取数据。如果数据结构为空,消费者会被阻塞直到有数据可以消费。如果数据结构满了,生产者就会被阻塞直到有空位来放数据。PriorityBlockingQueue:在这个数据结构,元素是按照顺序储存的。元素们必须实现 带有 compareTo() 方法的 Comparable 接口。当你在结构中插入数据时,它会与数据元素对比直到找到它的位置。LinkedTransferQueue 的元素是按照抵达顺序储存的,所以越早到的越先被消耗。你有可能需要开发 producer/ consumer 程序,它的消耗顺序是由优先级决定的而不是抵达时间。在这个指南,你将学习如何实现在 producer/ consumer 问题中使用的数据结构,这些元素将被按照他们的优先级排序,级别高的会先被消耗。
准备
指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans,打开并创建一个新的java任务。
怎么做呢…
按照这些步骤来实现下面的例子::
001//1. 创建一个类,名为 MyPriorityTransferQueue,扩展 PriorityBlockingQueue 类并实现 TransferQueue 接口。 002public class MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> { 003 004//2. 声明一个私有 AtomicInteger 属性,名为 counter,用来储存正在等待元素的消费者的数量。 005private AtomicInteger counter; 006 007//3. 声明一个私有 LinkedBlockingQueue 属性,名为 transferred。 008private LinkedBlockingQueue<E> transfered; 009 010//4. 声明一个私有 ReentrantLock 属性,名为 lock。 011private ReentrantLock lock; 012 013//5. 实现类的构造函数,初始化它的属性值。 014public MyPriorityTransferQueue() { 015 counter=new AtomicInteger(0); 016 lock=new ReentrantLock(); 017 transfered=new LinkedBlockingQueue<E>(); 018} 019 020//6. 实现 tryTransfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待,此方法返回 false 值。 021@Override 022public boolean tryTransfer(E e) { 023 lock.lock(); 024 boolean value; 025 if (counter.get()==0) { 026 value=false; 027 } else { 028 put(e); 029 value=true; 030 } 031 lock.unlock(); 032 return value; 033} 034 035//7. 实现 transfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待, 036此方法把元素存入一个特殊queue,为了发送给第一个尝试获取一个元素的消费者并阻塞线程直到元素被消耗。 037@Override 038public void transfer(E e) throws InterruptedException { 039 lock.lock(); 040 if (counter.get()!=0) { 041 put(e); 042 lock.unlock(); 043 } else { 044 transfered.add(e); 045 lock.unlock(); 046 synchronized (e) { 047 e.wait(); 048 } 049 } 050} 051 052//8. 实现 tryTransfer() 方法,它接收3个参数: 元素,和需要等待消费者的时间(如果没有消费者的话),和用来注明时间的单位。如果有消费者在等待,立刻发送元素。否则,转化时间到毫秒并使用 wait() 方法让线程进入休眠。当消费者取走元素时,如果线程在 wait() 方法里休眠,你将使用 notify() 方法唤醒它。 053@Override 054public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { 055 lock.lock(); 056 if (counter.get()!=0) { 057 put(e); 058 lock.unlock(); 059 return true; 060 } else { 061 transfered.add(e); 062 long newTimeout= TimeUnit.MILLISECONDS.convert(timeout, unit); 063 lock.unlock(); 064 e.wait(newTimeout); 065 lock.lock(); 066 067 if (transfered.contains(e)) { 068 transfered.remove(e); 069 lock.unlock(); 070 return false; 071 } else { 072 lock.unlock(); 073 return true; 074 } 075 } 076} 077 078//9. 实现 hasWaitingConsumer() 方法。使用 counter 属性值来计算此方法的返回值。如果counter 的值大于0,放回 true。不然,返回 false。 079@Override 080public boolean hasWaitingConsumer() { 081 return (counter.get()!=0); 082} 083 084//10. 实现 getWaitingConsumerCount() 方法。返回counter 属性值。 085@Override 086public int getWaitingConsumerCount() { 087 return counter.get(); 088} 089 090//11.实现 take() 方法。此方法是当消费者需要元素时被消费者调用的。首先,获取之前定义的锁并增加在等待的消费者数量。 091@Override 092public E take() throws InterruptedException { 093 lock.lock(); 094 counter.incrementAndGet(); 095 096//12.如果在 transferred queue 中无任何元素。释放锁并使用 take() 方法尝试从queue中获取元素,此方法将让线程进入睡眠直到有元素可以消耗。 097E value=transfered.poll(); 098if (value==null) { 099 lock.unlock(); 100 value=super.take(); 101 lock.lock(); 102 103//13. 否则,从transferred queue 中取走元素并唤醒正在等待要消耗元素的线程(如果有的话)。 104} else { 105 synchronized (value) { 106 value.notify(); 107 } 108} 109 110//14. 最后,增加正在等待的消费者的数量并释放锁。 111counter.decrementAndGet(); 112lock.unlock(); 113return value; 114} 115 116//15. 实现一个类,名为 Event,扩展 Comparable 接口,把 Event 类参数化。 117public class Event implements Comparable<Event> { 118 119//16. 声明一个私有 String 属性,名为 thread,用来储存创建事件的线程的名字。 120private String thread; 121 122//17. 声明一个私有 int 属性,名为 priority,用来储存事件的优先级。 123private int priority; 124 125//18. 实现类的构造函数,初始化它的属性值。 126public Event(String thread, int priority){ 127 this.thread=thread; 128 this.priority=priority; 129} 130 131//19. 实现一个方法,返回 thread 属性值。 132public String getThread() { 133 return thread; 134} 135 136//20. 实现一个方法,返回 priority 属性值。 137public int getPriority() { 138return priority; 139} 140 141//21. 实现 compareTo() 方法。此方法把当前事件与接收到的参数事件进行对比。返回 -1,如果当前事件的优先级的级别高于参数;返回 1,如果当前事件的优先级低于参数;如果相等,则返回 0。你将获得一个按优先级递减顺序排列的list。有高等级的事件就会被排到queue的最前面。 142public int compareTo(Event e) { 143 if (this.priority>e.getPriority()) { 144 return -1; 145 } else if (this.priority<e.getPriority()) { 146 return 1; 147 } else { 148 return 0; 149 } 150} 151 152//22. 实现一个类,名为 Producer,它实现 Runnable 接口。 153public class Producer implements Runnable { 154 155//23. 声明一个私有 MyPriorityTransferQueue 属性,接收参数化的 Event 类属性,名为 buffer,用来储存这个生产者生成的事件。 156private MyPriorityTransferQueue<Event> buffer; 157 158//24. 实现类的构造函数,初始化它的属性值。 159public Producer(MyPriorityTransferQueue<Event> buffer) { 160 this.buffer=buffer; 161} 162 163//25. 这个类的实现 run() 方法。创建 100 个 Event 对象,用他们被创建的顺序决定优先级(越先创建的优先级越高)并使用 put() 方法把他们插入queue中。 164public void run() { 165 for (int i=0; i<100; i++) { 166 Event event=new Event(Thread.currentThread().getName(),i); 167 buffer.put(event); 168 } 169} 170 171//26. 实现一个类,名为 Consumer,它要实现 Runnable 接口。 172public class Consumer implements Runnable { 173 174//27. 声明一个私有 MyPriorityTransferQueue 属性,参数化 Event 类属性,名为 buffer,用来获取这个类的事件消费者。 175private MyPriorityTransferQueue<Event> buffer; 176 177//28. 实现类的构造函数,初始化它的属性值。 178public Consumer(MyPriorityTransferQueue<Event> buffer) { 179 this.buffer=buffer; 180} 181 182//29. 实现 run() 方法。它使用 take() 方法消耗1002 Events (这个例子实现的全部事件)并把生成事件的线程数量和它的优先级别写入操控台。 183@Override 184public void run() { 185 for (int i=0; i<1002; i++) { 186 try { 187 Event value=buffer.take(); 188 System.out.printf("Consumer: %s: %d\n",value. getThread(),value.getPriority()); 189 } catch (InterruptedException e) { 190 e.printStackTrace(); 191 } 192 } 193} 194 195//30. 创建例子的主类通过创建一个类,名为 Main 并添加 main()方法。 196public class Main { 197 198public static void main(String[] args) throws Exception { 199 200//31. 创建一个 MyPriorityTransferQueue 对象,名为 buffer。 201MyPriorityTransferQueue<Event> buffer=new MyPriorityTransferQu eue<Event>(); 202 203//32. 创建一个 Producer 任务并运行 10 线程来执行任务。 204Producer producer=new Producer(buffer); 205Thread producerThreads[]=new Thread[10]; 206for (int i=0; i<producerThreads.length; i++) { 207 producerThreads[i]=new Thread(producer); 208 producerThreads[i].start(); 209} 210 211//33.创建并运行一个 Consumer 任务。 212Consumer consumer=new Consumer(buffer); 213Thread consumerThread=new Thread(consumer); 214consumerThread.start(); 215 216//34. 写入当前的消费者数量。 217System.out.printf("Main: Buffer: Consumer count: %d\n",buffer. getWaitingConsumerCount()); 218 219//35. 使用 transfer() 方法传输一个事件给消费者。 220Event myEvent=new Event("Core Event",0); 221buffer.transfer(myEvent); 222System.out.printf("Main: My Event has ben transfered.\n"); 223 224//36. 使用 join() 方法等待生产者的完结。 225for (int i=0; i<producerThreads.length; i++) { 226 try { 227 producerThreads[i].join(); 228 } catch (InterruptedException e) { 229 e.printStackTrace(); 230 } 231} 232 233//37. 让线程休眠1秒。 234TimeUnit.SECONDS.sleep(1); 235 236//38.写入当前的消费者数量。 237System.out.printf("Main: Buffer: Consumer count: %d\n",buffer. getWaitingConsumerCount()); 238 239//39. 使用 transfer() 方法传输另一个事件。 240myEvent=new Event("Core Event 2",0); 241buffer.transfer(myEvent); 242 243//40. 使用 join() 方法等待消费者完结。 244consumerThread.join(); 245 246//41. 写信息表明程序结束。 247System.out.printf("Main: End of the program\n");它是怎么工作的…
在这个指南,你已经实现了 MyPriorityTransferQueue 数据结构。这个数据类型是在 producer/consumer 问题中使用的,它的元素是按照优先级排列的。由于 Java 不支持多个继承,所以你首先要决定的是 MyPriorityTransferQueue 类的基类。你扩展了 PriorityBlockingQueue 类,来实现在结构中插入数据按照优先级排序。你也实现了 TransferQueue 接口,添加了与 producer/consumer 相关的3个方法。
MyPriortyTransferQueue 类有以下2个属性:
AtomicInteger 属性,名为 counter: 此属性储存了正在等待从数据类型提取元素的消费者的数量。当一个消费者调用 take()操作来从数据类型中提取元素时,counter 数增加。当消费者结束 take() 操作的执行时,counter 数再次增加。在 hasWaitingConsumer() 和 getWaitingConsumerCount() 方法的实现中使用到了 counter。ReentrantLock 属性,名为 lock: 此属性是用来控制访问已实现的操作。只有一个线程可以用数据类型。最后一个,LinkedBlockingQueue list 用来储存传输的元素。在 MyPriorityTransferQueue 中,你实现了一些方法。全部方法都在 TransferQueue 接口中声明了和在PriorityBlockingQueue 接口实现的 take() 方法。在之前已经描述了2个方法了。来看看剩下的方法的描述:
tryTransfer(E e): 此方法尝试直接发送元素给消费者。如果有消费者在等待,此方法储存元素到 priority queue 中为了立刻提供给消费者,并返回 true 值。如果没有消费者在等待,方法返回 false 值。transfer(E e): 此方法直接发送元素给消费者。如果有消费者在等待,此方法储存元素到 priority queue 中为了立刻提供给消费者。否则,把元素储存到已传输的元素list 并阻塞线程直到元素被消耗。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。
tryTransfer(E e, long timeout, TimeUnit unit): 此方法与 transfer() 方法相似,只是它的线程被阻塞的时间段是由参数决定的。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。take(): 此方法返回下一个要被消耗的元素。如果在 transferred 元素list中有元素,就从list中取走元素。否则,就从 priority queue 中取元素。一旦你实现了数据类型,你就实现了 Event 类。它就是在数据类型里储存的元素构成的类。Event 类有2个属性用来储存生产者的ID和事件的优先级,并实现了 Comparable 接口,为了满足你的数据类型的需要。
接着,你实现了 Producer 和 Consumer 类。在这个例子中,你有 10 个生产者和一个消费者,他们共享同一个 buffer。每个生产者生成100个事件,他们的优先级是递增的, 所以有高优先级的事件在越后面才生成。
例子的主类创建了一个 MyPriorityTransferQueue 对象,10个生产者,和一个消费者,然后使用MyPriorityTransferQueue buffer 的 transfer() 方法来传输2个事件到 buffer。
以下截图是程序运行的部分输出:
你可以发现有着高级别的事件如何先被消费,和一个消费者如何消费传输的事件。
参见第六章,并发集:Using blocking thread-safe lists ordered by priority第六章,并发集:Using blocking thread-safe lists
文章转自 并发编程网-ifeve.com