定制并发类(十)实现一个基于优先级的传输队列

    xiaoxiao2024-08-22  99

    声明:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 译者:郑玉婷

    实现一个基于优先级的传输队列

    Java 7 API 提供几种与并发应用相关的数据类型。从这里面,我们想来重点介绍以下2种数据类型:

    LinkedTransferQueue:这个数据类型支持那些有生产者和消费者结构的程序。 在那些应用,你有一个或者多个数据生产者,一个或多个数据消费者和一个被生产者和消费者共享的数据类型。生产者把数据放入数据结构内,然后消费者从数据结构内提取数据。如果数据结构为空,消费者会被阻塞直到有数据可以消费。如果数据结构满了,生产者就会被阻塞直到有空位来放数据。PriorityBlockingQueue:在这个数据结构,元素是按照顺序储存的。元素们必须实现 带有 compareTo() 方法的 Comparable 接口。当你在结构中插入数据时,它会与数据元素对比直到找到它的位置。

    LinkedTransferQueue 的元素是按照抵达顺序储存的,所以越早到的越先被消耗。你有可能需要开发 producer/ consumer 程序,它的消耗顺序是由优先级决定的而不是抵达时间。在这个指南,你将学习如何实现在 producer/ consumer 问题中使用的数据结构,这些元素将被按照他们的优先级排序,级别高的会先被消耗。

    准备

    指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans,打开并创建一个新的java任务。

    怎么做呢…

    按照这些步骤来实现下面的例子::

    001//1.   创建一个类,名为 MyPriorityTransferQueue,扩展 PriorityBlockingQueue 类并实现 TransferQueue 接口。 002public class MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> { 003  004//2.   声明一个私有 AtomicInteger 属性,名为 counter,用来储存正在等待元素的消费者的数量。 005private AtomicInteger counter; 006  007//3.   声明一个私有 LinkedBlockingQueue 属性,名为 transferred。 008private LinkedBlockingQueue<E> transfered; 009  010//4.   声明一个私有 ReentrantLock 属性,名为 lock。 011private ReentrantLock lock; 012  013//5.   实现类的构造函数,初始化它的属性值。 014public MyPriorityTransferQueue() { 015     counter=new AtomicInteger(0); 016     lock=new ReentrantLock(); 017     transfered=new LinkedBlockingQueue<E>(); 018} 019  020//6.   实现 tryTransfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待,此方法返回 false 值。 021@Override 022public boolean tryTransfer(E e) { 023     lock.lock(); 024     boolean value; 025     if (counter.get()==0) { 026         value=false; 027     } else { 028         put(e); 029         value=true; 030     } 031     lock.unlock(); 032     return value; 033} 034  035//7.    实现 transfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待, 036此方法把元素存入一个特殊queue,为了发送给第一个尝试获取一个元素的消费者并阻塞线程直到元素被消耗。 037@Override 038public void transfer(E e) throws InterruptedException { 039     lock.lock(); 040     if (counter.get()!=0) { 041         put(e); 042         lock.unlock(); 043     } else { 044         transfered.add(e); 045         lock.unlock(); 046         synchronized (e) { 047             e.wait(); 048         } 049     } 050} 051  052//8.   实现 tryTransfer() 方法,它接收3个参数: 元素,和需要等待消费者的时间(如果没有消费者的话),和用来注明时间的单位。如果有消费者在等待,立刻发送元素。否则,转化时间到毫秒并使用 wait() 方法让线程进入休眠。当消费者取走元素时,如果线程在 wait() 方法里休眠,你将使用 notify() 方法唤醒它。 053@Override 054public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { 055     lock.lock(); 056     if (counter.get()!=0) { 057         put(e); 058         lock.unlock(); 059         return true; 060     } else { 061         transfered.add(e); 062         long newTimeout= TimeUnit.MILLISECONDS.convert(timeout, unit); 063         lock.unlock(); 064         e.wait(newTimeout); 065         lock.lock(); 066  067         if (transfered.contains(e)) { 068             transfered.remove(e); 069             lock.unlock(); 070             return false; 071         } else { 072             lock.unlock(); 073             return true; 074         } 075     } 076} 077  078//9.   实现 hasWaitingConsumer() 方法。使用 counter 属性值来计算此方法的返回值。如果counter 的值大于0,放回 true。不然,返回 false。 079@Override 080public boolean hasWaitingConsumer() { 081     return (counter.get()!=0); 082} 083  084//10. 实现 getWaitingConsumerCount() 方法。返回counter 属性值。 085@Override 086public int getWaitingConsumerCount() { 087     return counter.get(); 088} 089  090//11.实现 take() 方法。此方法是当消费者需要元素时被消费者调用的。首先,获取之前定义的锁并增加在等待的消费者数量。 091@Override 092public E take() throws InterruptedException { 093     lock.lock(); 094     counter.incrementAndGet(); 095  096//12.如果在 transferred queue 中无任何元素。释放锁并使用 take() 方法尝试从queue中获取元素,此方法将让线程进入睡眠直到有元素可以消耗。 097E value=transfered.poll(); 098if (value==null) { 099     lock.unlock(); 100     value=super.take(); 101     lock.lock(); 102  103//13. 否则,从transferred queue 中取走元素并唤醒正在等待要消耗元素的线程(如果有的话)。 104} else { 105     synchronized (value) { 106         value.notify(); 107     } 108} 109  110//14. 最后,增加正在等待的消费者的数量并释放锁。 111counter.decrementAndGet(); 112lock.unlock(); 113return value; 114} 115  116//15. 实现一个类,名为 Event,扩展 Comparable 接口,把 Event 类参数化。 117public class Event implements Comparable<Event> { 118  119//16. 声明一个私有 String 属性,名为 thread,用来储存创建事件的线程的名字。 120private String thread; 121  122//17.  声明一个私有 int 属性,名为 priority,用来储存事件的优先级。 123private int priority; 124  125//18. 实现类的构造函数,初始化它的属性值。 126public Event(String thread, int priority){ 127     this.thread=thread; 128     this.priority=priority; 129} 130  131//19. 实现一个方法,返回 thread 属性值。 132public String getThread() { 133     return thread; 134} 135  136//20. 实现一个方法,返回 priority  属性值。 137public int getPriority() { 138return priority; 139} 140  141//21. 实现 compareTo() 方法。此方法把当前事件与接收到的参数事件进行对比。返回 -1,如果当前事件的优先级的级别高于参数;返回 1,如果当前事件的优先级低于参数;如果相等,则返回 0。你将获得一个按优先级递减顺序排列的list。有高等级的事件就会被排到queue的最前面。 142public int compareTo(Event e) { 143     if (this.priority>e.getPriority()) { 144         return -1; 145     } else if (this.priority<e.getPriority()) { 146         return 1; 147     } else { 148         return 0; 149     } 150} 151  152//22. 实现一个类,名为 Producer,它实现 Runnable 接口。 153public class Producer implements Runnable { 154  155//23. 声明一个私有 MyPriorityTransferQueue 属性,接收参数化的 Event  类属性,名为 buffer,用来储存这个生产者生成的事件。 156private MyPriorityTransferQueue<Event> buffer; 157  158//24. 实现类的构造函数,初始化它的属性值。 159public Producer(MyPriorityTransferQueue<Event> buffer) { 160     this.buffer=buffer; 161} 162  163//25. 这个类的实现 run() 方法。创建 100 个 Event 对象,用他们被创建的顺序决定优先级(越先创建的优先级越高)并使用 put() 方法把他们插入queue中。 164public void run() { 165     for (int i=0; i<100; i++) { 166         Event event=new Event(Thread.currentThread().getName(),i); 167         buffer.put(event); 168     } 169} 170  171//26. 实现一个类,名为 Consumer,它要实现 Runnable 接口。 172public class Consumer implements Runnable { 173  174//27.  声明一个私有 MyPriorityTransferQueue 属性,参数化 Event 类属性,名为 buffer,用来获取这个类的事件消费者。 175private MyPriorityTransferQueue<Event> buffer; 176  177//28. 实现类的构造函数,初始化它的属性值。 178public Consumer(MyPriorityTransferQueue<Event> buffer) { 179     this.buffer=buffer; 180} 181  182//29. 实现 run() 方法。它使用 take() 方法消耗1002 Events (这个例子实现的全部事件)并把生成事件的线程数量和它的优先级别写入操控台。 183@Override 184public void run() { 185     for (int i=0; i<1002; i++) { 186         try { 187             Event value=buffer.take(); 188             System.out.printf("Consumer: %s: %d\n",value. getThread(),value.getPriority()); 189         } catch (InterruptedException e) { 190             e.printStackTrace(); 191         } 192     } 193} 194  195//30. 创建例子的主类通过创建一个类,名为 Main 并添加 main()方法。 196public class Main { 197  198public static void main(String[] args) throws Exception { 199  200//31. 创建一个 MyPriorityTransferQueue 对象,名为 buffer。 201MyPriorityTransferQueue<Event> buffer=new MyPriorityTransferQu eue<Event>(); 202  203//32. 创建一个 Producer 任务并运行 10 线程来执行任务。 204Producer producer=new Producer(buffer); 205Thread producerThreads[]=new Thread[10]; 206for (int i=0; i<producerThreads.length; i++) { 207     producerThreads[i]=new Thread(producer); 208     producerThreads[i].start(); 209} 210  211//33.创建并运行一个 Consumer 任务。 212Consumer consumer=new Consumer(buffer); 213Thread consumerThread=new Thread(consumer); 214consumerThread.start(); 215  216//34. 写入当前的消费者数量。 217System.out.printf("Main: Buffer: Consumer count: %d\n",buffer. getWaitingConsumerCount()); 218  219//35. 使用 transfer() 方法传输一个事件给消费者。 220Event myEvent=new Event("Core Event",0); 221buffer.transfer(myEvent); 222System.out.printf("Main: My Event has ben transfered.\n"); 223  224//36. 使用 join() 方法等待生产者的完结。 225for (int i=0; i<producerThreads.length; i++) { 226     try { 227         producerThreads[i].join(); 228     } catch (InterruptedException e) { 229         e.printStackTrace(); 230     } 231} 232  233//37.  让线程休眠1秒。 234TimeUnit.SECONDS.sleep(1); 235  236//38.写入当前的消费者数量。 237System.out.printf("Main: Buffer: Consumer count: %d\n",buffer. getWaitingConsumerCount()); 238  239//39. 使用 transfer() 方法传输另一个事件。 240myEvent=new Event("Core Event 2",0); 241buffer.transfer(myEvent); 242  243//40. 使用 join() 方法等待消费者完结。 244consumerThread.join(); 245  246//41. 写信息表明程序结束。 247System.out.printf("Main: End of the program\n");

    它是怎么工作的…

    在这个指南,你已经实现了 MyPriorityTransferQueue 数据结构。这个数据类型是在 producer/consumer 问题中使用的,它的元素是按照优先级排列的。由于 Java 不支持多个继承,所以你首先要决定的是 MyPriorityTransferQueue 类的基类。你扩展了 PriorityBlockingQueue 类,来实现在结构中插入数据按照优先级排序。你也实现了 TransferQueue 接口,添加了与 producer/consumer 相关的3个方法。

    MyPriortyTransferQueue 类有以下2个属性:

    AtomicInteger 属性,名为 counter: 此属性储存了正在等待从数据类型提取元素的消费者的数量。当一个消费者调用 take()操作来从数据类型中提取元素时,counter 数增加。当消费者结束 take() 操作的执行时,counter 数再次增加。在 hasWaitingConsumer() 和 getWaitingConsumerCount() 方法的实现中使用到了 counter。ReentrantLock 属性,名为 lock: 此属性是用来控制访问已实现的操作。只有一个线程可以用数据类型。最后一个,LinkedBlockingQueue list 用来储存传输的元素。

    在 MyPriorityTransferQueue 中,你实现了一些方法。全部方法都在 TransferQueue 接口中声明了和在PriorityBlockingQueue 接口实现的 take() 方法。在之前已经描述了2个方法了。来看看剩下的方法的描述:

    tryTransfer(E e): 此方法尝试直接发送元素给消费者。如果有消费者在等待,此方法储存元素到 priority queue 中为了立刻提供给消费者,并返回 true 值。如果没有消费者在等待,方法返回 false 值。transfer(E e): 此方法直接发送元素给消费者。如果有消费者在等待,此方法储存元素到 priority queue 中为了立刻提供给消费者。

    否则,把元素储存到已传输的元素list 并阻塞线程直到元素被消耗。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。

    tryTransfer(E e, long timeout, TimeUnit unit): 此方法与 transfer() 方法相似,只是它的线程被阻塞的时间段是由参数决定的。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。take(): 此方法返回下一个要被消耗的元素。如果在 transferred 元素list中有元素,就从list中取走元素。否则,就从 priority queue 中取元素。

    一旦你实现了数据类型,你就实现了 Event 类。它就是在数据类型里储存的元素构成的类。Event 类有2个属性用来储存生产者的ID和事件的优先级,并实现了 Comparable 接口,为了满足你的数据类型的需要。

    接着,你实现了 Producer 和 Consumer 类。在这个例子中,你有 10 个生产者和一个消费者,他们共享同一个 buffer。每个生产者生成100个事件,他们的优先级是递增的, 所以有高优先级的事件在越后面才生成。

    例子的主类创建了一个 MyPriorityTransferQueue 对象,10个生产者,和一个消费者,然后使用MyPriorityTransferQueue buffer 的 transfer() 方法来传输2个事件到 buffer。

    以下截图是程序运行的部分输出:

    你可以发现有着高级别的事件如何先被消费,和一个消费者如何消费传输的事件。

    参见第六章,并发集:Using blocking thread-safe lists ordered by priority第六章,并发集:Using blocking thread-safe lists

    文章转自 并发编程网-ifeve.com

    最新回复(0)