如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。
CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。
它是一个更高级的ExecutorService,它本身自带一个线程安全的线性表,无需用户额外创建。它提供了2种方法从线性表中取出结果,poll()是非阻塞的,若目前无结果,返回一个null,线程继续运行不阻塞。take()是阻塞的,若当前无结果,则线程阻塞,直到产生一个结果,被取出返回,线程才继续运行。
此类将安排那些完成时提交的任务,把它们结果放置在可使用 take 访问的队列上, 外部可以通过take(),poll(),poll(long timeout,TimeUnit unit)来取得。该类非常轻便,适合于在执行几组任务时临时使用。
主要构造函数public ExecutorCompletionService(Executor executor) 使用为执行基本任务而提供的执行程序创建一个 ExecutorCompletionService,并将 LinkedBlockingQueue 作为完成队列。 参数: executor - 要使用的执行程序 抛出: NullPointerException - 如果执行程序为 nullpublic ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) 使用为执行基本任务而提供的执行程序创建一个 ExecutorCompletionService,并将所提供的队列作为其完成队列。 参数: executor - 要使用的执行程序 completionQueue - 用作完成队列的队列,通常是专供此服务使用的队列 抛出: NullPointerException - 如果执行程序或 completionQueue 为 null
主要成员函数 public Future<V> submit(Callable<V> task) 从接口 CompletionService 复制的描述 提交要执行的值返回任务,并返回表示挂起的任务结果的 Future。在完成时,可能会提取或轮询此任务。 指定者: 接口 CompletionService<V> 中的 submit 参数: task - 要提交的任务 返回: 一个表示挂起的任务完成的 Future public Future<V> submit(Runnable task,V result) 从接口 CompletionService 复制的描述 提交要执行的 Runnable 任务,并返回一个表示任务完成的 Future,可以提取或轮询此任务。 指定者: 接口 CompletionService<V> 中的 submit 参数: task - 要提交的任务 result - 要返回的已成功完成任务的结果 返回: 一个表示挂起的任务完成的 Future,其 get() 方法将返回完成时给出的结果值 public Future<V> take() throws InterruptedException 从接口 CompletionService 复制的描述 获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。 指定者: 接口 CompletionService<V> 中的 take 返回: 表示下一个已完成任务的 Future 抛出: InterruptedException - 如果在等待时被中断 public Future<V> poll() 从接口 CompletionService 复制的描述 获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。 指定者: 接口 CompletionService<V> 中的 poll 返回: 表示下一个已完成任务的 Future;如果不存在这样的任务,则返回 null public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException 从接口 CompletionService 复制的描述 获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则将等待指定的时间(如果有必要)。 指定者: 接口 CompletionService<V> 中的 poll 参数: timeout - 放弃之前需要等待的时间长度,以 unit 为时间单位 unit - 确定如何解释 timeout 参数的 TimeUnit 返回: 表示下一个已完成任务的 Future;如果等待了指定时间仍然不存在这样的任务,则返回 null 抛出: InterruptedException - 如果在等待时被中断 代码: import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; public class Test17 { public static void main(String[] args) throws Exception { Test17 t = new Test17(); t.count1(); t.count2(); } //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理 public void count1() throws Exception{ ExecutorService exec = Executors.newCachedThreadPool(); BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>(); for(int i=0; i<10; i++){ Future<Integer> future =exec.submit(getTask()); queue.add(future); } int sum = 0; int queueSize = queue.size(); for(int i=0; i<queueSize; i++){ sum += queue.take().get(); } System.out.println("总数为:"+sum); exec.shutdown(); } //使用CompletionService(完成服务)保持Executor处理的结果 public void count2() throws InterruptedException, ExecutionException{ ExecutorService exec = Executors.newCachedThreadPool(); CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec); for(int i=0; i<10; i++){ execcomp.submit(getTask()); } int sum = 0; for(int i=0; i<10; i++){ //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。 Future<Integer> future = execcomp.take(); sum += future.get(); } System.out.println("总数为:"+sum); exec.shutdown(); } //得到一个任务 public Callable<Integer> getTask(){ final Random rand = new Random(); Callable<Integer> task = new Callable<Integer>(){ @Override public Integer call() throws Exception { int i = rand.nextInt(10); int j = rand.nextInt(10); int sum = i*j; System.out.print(sum+"\t"); return sum; } }; return task; } 结果:490 6 48 28 312 0 0 6 总数为:152 56 8 5425 35 10 12 854 20 总数为:282
相关资源:python入门教程(PDF版)